Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::FinalValue { .. }) => ("completed", "final_value", None),
15        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
16        TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
17            "completed",
18            "agent_frame_switch",
19            Some(lash_trace::TraceAgentFrameSwitch {
20                frame_id: frame_id.clone(),
21            }),
22        ),
23        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
24    }
25}
26
27fn trace_stop_reason(stop: &TurnStop) -> &'static str {
28    match stop {
29        TurnStop::Cancelled => "cancelled",
30        TurnStop::Incomplete => "incomplete",
31        TurnStop::InvalidInput => "invalid_input",
32        TurnStop::MaxTurns => "max_turns",
33        TurnStop::ToolFailure => "tool_failure",
34        TurnStop::ProviderError => "provider_error",
35        TurnStop::PluginAbort => "plugin_abort",
36        TurnStop::RuntimeError => "runtime_error",
37        TurnStop::SubmittedError { .. } => "submitted_error",
38        TurnStop::ToolError { .. } => "tool_error",
39    }
40}
41
42fn session_head_refresh_error(err: SessionError) -> RuntimeError {
43    RuntimeError::new(
44        RuntimeErrorCode::Other("session_head_refresh".to_string()),
45        err.to_string(),
46    )
47}
48
49#[derive(Clone, Copy)]
50enum SessionExecutionLeaseReleasePolicy {
51    FinalCommit,
52    KeepOnAgentFrameSwitch,
53}
54
55impl SessionExecutionLeaseReleasePolicy {
56    fn should_release(self, outcome: &TurnOutcome) -> bool {
57        match self {
58            Self::FinalCommit => true,
59            Self::KeepOnAgentFrameSwitch => {
60                !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
61            }
62        }
63    }
64}
65
66fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
67    match payload {
68        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
69        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
70    }
71}
72
73fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
74    claim
75        .batches
76        .iter()
77        .map(|batch| batch.batch_id.clone())
78        .collect()
79}
80
81/// Measures the whole host-visible turn.
82///
83/// Opened before the runtime claims the turn (session-execution lease and
84/// queued-work/turn-input claims) and stamped onto the assembled turn after
85/// the final commit and post-persist hooks complete, so
86/// [`ExecutionSummary`](crate::ExecutionSummary) timing covers
87/// claim → final commit. Reads only the injected [`Clock`](crate::Clock):
88/// `started_at_ms` comes from the wall-clock source and the duration from the
89/// monotonic source, so deterministic clocks produce deterministic timing.
90#[derive(Clone, Copy)]
91struct TurnStopwatch {
92    started: std::time::Instant,
93    started_at_ms: u64,
94}
95
96impl TurnStopwatch {
97    fn start(clock: &dyn crate::Clock) -> Self {
98        Self {
99            started: clock.now(),
100            started_at_ms: clock.timestamp_ms(),
101        }
102    }
103
104    fn stamp(&self, turn: &mut AssembledTurn, clock: &dyn crate::Clock) {
105        turn.execution.started_at_ms = self.started_at_ms;
106        turn.execution.duration_ms = clock
107            .now()
108            .saturating_duration_since(self.started)
109            .as_millis() as u64;
110    }
111}
112
113fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
114    format!("{parent_turn_id}:{phase}")
115}
116
117fn scoped_child_turn_controller<'run>(
118    scoped_effect_controller: &'run ScopedEffectController<'_>,
119    session_id: &str,
120    turn_id: &str,
121) -> Result<ScopedEffectController<'run>, RuntimeError> {
122    ScopedEffectController::borrowed(
123        scoped_effect_controller.controller(),
124        ExecutionScope::turn(session_id, turn_id),
125    )
126}
127
128pub(in crate::runtime) fn queued_work_trace_payload(
129    boundary: crate::QueuedWorkClaimBoundary,
130    claim: &crate::QueuedWorkClaim,
131    causes: &[crate::TurnCause],
132) -> serde_json::Value {
133    serde_json::json!({
134        "boundary": boundary,
135        "claim_id": claim.claim_id,
136        "owner_id": claim.owner.owner_id,
137        "incarnation_id": claim.owner.incarnation_id,
138        "batch_ids": queued_work_batch_ids(claim),
139        "payload_types": claim.batches.iter()
140            .flat_map(|batch| batch.items.iter())
141            .map(|item| queued_work_payload_type(&item.payload))
142            .collect::<Vec<_>>(),
143        "causes": causes,
144    })
145}
146
147pub(in crate::runtime) fn queued_work_completion_trace_payload(
148    completions: &[crate::QueuedWorkCompletion],
149) -> serde_json::Value {
150    serde_json::json!({
151        "claims": completions.iter().map(|completion| {
152            serde_json::json!({
153                "session_id": completion.session_id,
154                "claim_id": completion.claim_id,
155                "batch_ids": completion.batch_ids,
156            })
157        }).collect::<Vec<_>>(),
158    })
159}
160
161async fn emit_queued_work_started_to_sink(
162    events: &dyn TurnActivitySink,
163    boundary: crate::QueuedWorkClaimBoundary,
164    claim: &crate::QueuedWorkClaim,
165    causes: Vec<crate::TurnCause>,
166) {
167    emit_turn_activity_to_sink(
168        events,
169        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
170            boundary,
171            batch_ids: queued_work_batch_ids(claim),
172            causes,
173        }),
174    )
175    .await;
176}
177
178pub(in crate::runtime) async fn send_queued_work_started_event(
179    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
180    boundary: crate::QueuedWorkClaimBoundary,
181    claim: &crate::QueuedWorkClaim,
182    causes: Vec<crate::TurnCause>,
183) {
184    send_turn_activity(
185        event_tx,
186        TurnActivityId::fresh(),
187        TurnEvent::QueuedWorkStarted {
188            boundary,
189            batch_ids: queued_work_batch_ids(claim),
190            causes,
191        },
192    )
193    .await;
194}
195
196struct TurnFinishInput {
197    turn_pipeline: TurnBoundary,
198    assembler: TurnAssembler,
199    new_messages: crate::MessageSequence,
200    policy: RuntimeSessionPolicy,
201    turn_index: usize,
202    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
203    turn_input_completions: Vec<crate::TurnInputCompletion>,
204    trace_turn_id: String,
205}
206
207impl LashRuntime {
208    fn max_context_tokens(&self) -> usize {
209        self.state.effective_policy().context_window_tokens()
210    }
211
212    async fn claim_session_execution_lease(
213        &self,
214        cancel: CancellationToken,
215        busy_is_error: bool,
216    ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
217        let Some(store) = self
218            .session
219            .as_ref()
220            .and_then(|session| session.history_store())
221        else {
222            return Ok(None);
223        };
224        match SessionExecutionLeaseGuard::try_acquire(
225            store,
226            &self.state.session_id,
227            &self.runtime_lease_owner,
228            self.host.core.control.lease_timings,
229            Arc::clone(&self.host.core.clock),
230            cancel,
231        )
232        .await
233        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
234        {
235            Some(lease) => Ok(Some(lease)),
236            None if busy_is_error => Err(RuntimeError::new(
237                RuntimeErrorCode::SessionExecutionBusy,
238                format!(
239                    "session `{}` is already executing on another runtime owner",
240                    self.state.session_id
241                ),
242            )),
243            None => Ok(None),
244        }
245    }
246
247    async fn settle_session_execution_lease<T>(
248        &self,
249        guard: Option<&SessionExecutionLeaseGuard>,
250        result: Result<T, RuntimeError>,
251    ) -> Result<T, RuntimeError> {
252        match result {
253            Ok(value) => {
254                if let Some(guard) = guard {
255                    guard.release_if_live().await.map_err(|err| {
256                        RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
257                    })?;
258                }
259                Ok(value)
260            }
261            Err(err) => {
262                if err.code != RuntimeErrorCode::StoreCommitFailed
263                    && let Some(guard) = guard
264                    && let Err(release_err) = guard.release_if_live().await
265                {
266                    tracing::warn!(
267                        error = %release_err,
268                        "failed to release session execution lease after runtime error"
269                    );
270                }
271                Err(err)
272            }
273        }
274    }
275
276    async fn ensure_session_execution_lease_live(
277        &self,
278        guard: Option<&SessionExecutionLeaseGuard>,
279    ) -> Result<(), RuntimeError> {
280        let Some(guard) = guard else {
281            return Ok(());
282        };
283        guard.refresh_or_mark_lost().await.map_err(|err| {
284            RuntimeError::new(
285                RuntimeErrorCode::SessionExecutionLeaseLost,
286                format!(
287                    "session execution lease for session `{}` was lost before commit: {err}",
288                    self.state.session_id
289                ),
290            )
291        })
292    }
293
294    async fn abandon_queued_work_claims_after_lease_loss(
295        &self,
296        err: &RuntimeError,
297        claims: &[crate::QueuedWorkClaim],
298    ) {
299        if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
300            return;
301        }
302        let Some(store) = self
303            .session
304            .as_ref()
305            .and_then(|session| session.history_store())
306        else {
307            return;
308        };
309        for claim in claims {
310            if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
311                tracing::warn!(
312                    error = %abandon_err,
313                    session_id = %claim.session_id,
314                    claim_id = %claim.claim_id,
315                    "failed to abandon queued work claim after session execution lease loss"
316                );
317            }
318        }
319    }
320
321    async fn abandon_turn_input_claims_after_lease_loss(
322        &self,
323        err: &RuntimeError,
324        claims: &[crate::TurnInputClaim],
325    ) {
326        if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
327            return;
328        }
329        let Some(store) = self
330            .session
331            .as_ref()
332            .and_then(|session| session.history_store())
333        else {
334            return;
335        };
336        for claim in claims {
337            if let Err(abandon_err) = store.abandon_turn_input_claim(claim).await {
338                tracing::warn!(
339                    error = %abandon_err,
340                    session_id = %claim.session_id,
341                    claim_id = %claim.claim_id,
342                    "failed to abandon turn input claim after session execution lease loss"
343                );
344            }
345        }
346    }
347
348    #[doc(hidden)]
349    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
350        self.turn_phase_probe = Some(probe);
351    }
352
353    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
354        if let Some(probe) = self.turn_phase_probe.as_ref() {
355            probe.begin(phase);
356        }
357    }
358
359    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
360        if let Some(probe) = self.turn_phase_probe.as_ref() {
361            probe.end(phase);
362        }
363    }
364
365    async fn finish_turn(
366        &mut self,
367        finish: TurnFinishInput,
368        events: &dyn EventSink,
369        scoped_effect_controller: &ScopedEffectController<'_>,
370        cancel_state: &CancellationToken,
371        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
372        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
373    ) -> Result<AssembledTurn, RuntimeError> {
374        let TurnFinishInput {
375            mut turn_pipeline,
376            assembler,
377            new_messages,
378            policy,
379            turn_index,
380            queued_work_completions,
381            turn_input_completions,
382            trace_turn_id,
383        } = finish;
384        self.policy = self.state.effective_policy().clone();
385        turn_pipeline.state_mut().policy = self.policy.clone();
386        turn_pipeline.state_mut().turn_index = turn_index;
387
388        let mut turn_usage_delta = {
389            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
390            std::mem::take(&mut *ledger)
391        };
392        if assembler.token_usage.total() > 0 {
393            turn_usage_delta.push(TokenLedgerEntry {
394                source: "turn".to_string(),
395                model: policy.model.id.clone(),
396                usage: assembler.token_usage.clone(),
397            });
398        }
399        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
400
401        turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
402        if assembler.token_usage.total() > 0 {
403            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
404        }
405
406        let last_prompt_usage = assembler.last_llm_usage().and_then(normalize_prompt_usage);
407        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
408        let assembled_state = turn_pipeline.export_state_for_assembly();
409        let assembled = assembler.finish(
410            assembled_state,
411            cancel_state.is_cancelled(),
412            None,
413            &self.host.core.control.termination,
414        );
415
416        let Some(session) = self.session.as_ref() else {
417            self.state.apply_snapshot(&assembled.state);
418            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
419            return Ok(assembled);
420        };
421        self.ensure_session_execution_lease_live(session_execution_lease)
422            .await?;
423
424        let plugins = Arc::clone(session.plugins());
425        let manager = match self.runtime_session_services_for_turn(None) {
426            Ok(manager) => manager,
427            Err(err) => {
428                return Err(RuntimeError::new(
429                    RuntimeErrorCode::PluginSessionManager,
430                    err.to_string(),
431                ));
432            }
433        };
434
435        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
436        let finalized = match plugins
437            .finalize_turn_with_phase_probe(
438                assembled,
439                manager.state_service(),
440                manager.lifecycle_service(),
441                manager.graph_service(),
442                self.turn_phase_probe.clone(),
443            )
444            .await
445        {
446            Ok(finalized) => finalized,
447            Err(err) => {
448                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
449                return Err(RuntimeError::new(
450                    RuntimeErrorCode::PluginFinalizeTurn,
451                    err.to_string(),
452                ));
453            }
454        };
455        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
456        self.ensure_session_execution_lease_live(session_execution_lease)
457            .await?;
458
459        let mut returned_turn = finalized.turn;
460        let release_session_execution_lease =
461            session_execution_lease_release_policy.should_release(&returned_turn.outcome);
462        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
463        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
464        let queued_work_completion_trace = queued_work_completions.clone();
465        let pending_attachment_ids = self
466            .host
467            .core
468            .durability
469            .attachment_store
470            .pending_manifest_commit_ids();
471        if let Err(err) = turn_pipeline
472            .final_commit(
473                &mut returned_turn,
474                self.session.as_mut(),
475                &turn_usage_delta,
476                Some(&trace_turn_id),
477                queued_work_completions,
478                turn_input_completions,
479                cancel_state.is_cancelled().then(|| trace_turn_id.clone()),
480                pending_attachment_ids.clone(),
481                release_session_execution_lease
482                    .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
483                    .flatten(),
484            )
485            .await
486        {
487            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
488            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
489            return Err(err);
490        }
491        if release_session_execution_lease && let Some(lease) = session_execution_lease {
492            lease.mark_released();
493        }
494        self.host
495            .core
496            .durability
497            .attachment_store
498            .mark_manifest_committed(&pending_attachment_ids);
499        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
500
501        emit_session_events_to_sink(events, finalized.events).await;
502        self.state = turn_pipeline.into_final_state();
503        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
504            && let Some(session) = self.session.as_mut()
505        {
506            let protocol_session = Arc::clone(session.plugins().protocol_session());
507            let session_id = self.state.session_id.clone();
508            protocol_session
509                .restore_session(
510                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
511                    &self.state,
512                )
513                .await
514                .map_err(|err| {
515                    RuntimeError::new(
516                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
517                        err.to_string(),
518                    )
519                })?;
520        }
521        if !queued_work_completion_trace.is_empty() {
522            crate::trace::emit_trace(
523                &self.host.core.tracing.trace_sink,
524                &self.host.core.tracing.trace_context,
525                lash_trace::TraceContext::default()
526                    .for_session(returned_turn.state.session_id.clone())
527                    .for_turn_index(returned_turn.state.turn_index)
528                    .for_turn(trace_turn_id.clone()),
529                lash_trace::TraceEvent::Custom {
530                    name: "queued_work.completed".to_string(),
531                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
532                },
533                self.host.core.clock.as_ref(),
534            );
535        }
536        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
537        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
538            .await?;
539        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
540        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
541
542        self.emit_completed_turn_trace(
543            &returned_turn.state,
544            &returned_turn.outcome,
545            &trace_turn_id,
546        );
547        Ok(returned_turn)
548    }
549
550    fn emit_completed_turn_trace(
551        &self,
552        state: &SessionSnapshot,
553        outcome: &TurnOutcome,
554        trace_turn_id: &str,
555    ) {
556        if self.host.core.tracing.trace_sink.is_none() {
557            return;
558        }
559
560        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
561        crate::trace::emit_trace(
562            &self.host.core.tracing.trace_sink,
563            &self.host.core.tracing.trace_context,
564            lash_trace::TraceContext::default()
565                .for_session(state.session_id.clone())
566                .for_turn_index(state.turn_index)
567                .for_turn(trace_turn_id.to_string()),
568            lash_trace::TraceEvent::TurnCompleted {
569                status: status.to_string(),
570                done_reason: done_reason.to_string(),
571                agent_frame_switch,
572            },
573            self.host.core.clock.as_ref(),
574        );
575    }
576
577    async fn emit_turn_persisted_event(
578        &self,
579        returned_turn: &AssembledTurn,
580        scoped_effect_controller: &ScopedEffectController<'_>,
581        trace_turn_id: &str,
582    ) -> Result<(), RuntimeError> {
583        let Some(session) = self.session.as_ref() else {
584            return Ok(());
585        };
586        let Ok(manager) = self.runtime_session_services() else {
587            return Ok(());
588        };
589        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
590        let phase_controller = scoped_child_turn_controller(
591            scoped_effect_controller,
592            &self.state.session_id,
593            &phase_turn_id,
594        )?;
595        let direct_completions = manager.direct_completion_client(
596            RuntimeEffectControllerHandle::borrowed(phase_controller),
597            Some(phase_turn_id),
598        );
599
600        session
601            .plugins()
602            .emit_runtime_event_with_phase_probe(
603                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
604                    crate::SessionStateChangedContext {
605                        session_id: self.state.session_id.clone(),
606                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
607                        sessions: manager.state_service(),
608                        session_graph: manager.graph_service(),
609                        direct_completions,
610                    },
611                )),
612                self.turn_phase_probe.clone(),
613            )
614            .await;
615        Ok(())
616    }
617
618    /// Run a single turn and stream events to the host sink.
619    pub async fn stream_turn(
620        &mut self,
621        input: TurnInput,
622        opts: TurnOptions<'_>,
623    ) -> Result<AssembledTurn, RuntimeError> {
624        let stopwatch = TurnStopwatch::start(self.host.core.clock.as_ref());
625        let cancel = opts.cancel.clone();
626        let session_execution_lease = self
627            .claim_session_execution_lease(cancel.clone(), true)
628            .await?;
629        let scoped_effect_controller = opts.scoped_effect_controller();
630        let result = self
631            .stream_turn_with_scoped_effect_controller_inner(
632                input,
633                opts.events_or_noop(),
634                opts.turn_events_or_noop(),
635                scoped_effect_controller,
636                cancel,
637                None,
638                None,
639                session_execution_lease.as_ref(),
640                SessionExecutionLeaseReleasePolicy::FinalCommit,
641            )
642            .await
643            .map(|mut turn| {
644                stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
645                turn
646            });
647        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
648            .await
649    }
650
651    pub async fn stream_next_queued_work(
652        &mut self,
653        opts: TurnOptions<'_>,
654    ) -> Result<Option<AssembledTurn>, RuntimeError> {
655        self.stream_queued_work(opts, None).await
656    }
657
658    pub async fn stream_selected_queued_work(
659        &mut self,
660        opts: TurnOptions<'_>,
661        batch_ids: &[String],
662    ) -> Result<Option<AssembledTurn>, RuntimeError> {
663        self.stream_queued_work(opts, Some(batch_ids)).await
664    }
665
666    async fn stream_queued_work(
667        &mut self,
668        opts: TurnOptions<'_>,
669        selected_batch_ids: Option<&[String]>,
670    ) -> Result<Option<AssembledTurn>, RuntimeError> {
671        let stopwatch = TurnStopwatch::start(self.host.core.clock.as_ref());
672        let cancel = opts.cancel.clone();
673        let Some(session_execution_lease) = self
674            .claim_session_execution_lease(cancel.clone(), false)
675            .await?
676        else {
677            return Ok(None);
678        };
679        let session_execution_fence = session_execution_lease.fence();
680        let Some(store) = self
681            .session
682            .as_ref()
683            .and_then(|session| session.history_store())
684        else {
685            session_execution_lease
686                .release_if_live()
687                .await
688                .map_err(|err| {
689                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
690                })?;
691            return Ok(None);
692        };
693        let drain_commands_before_turn_input = if selected_batch_ids.is_some() {
694            true
695        } else {
696            self.session_commands_precede_pending_turn_input(store.as_ref())
697                .await?
698        };
699        if drain_commands_before_turn_input {
700            loop {
701                match self
702                    .drain_next_session_command(&session_execution_fence)
703                    .await
704                {
705                    Ok(Some(_)) => {}
706                    Ok(None) => break,
707                    Err(err) => {
708                        let _ = session_execution_lease.release_if_live().await;
709                        return Err(err);
710                    }
711                }
712            }
713        }
714        if selected_batch_ids.is_none() {
715            let input_claim = store
716                .claim_next_turn_inputs(
717                    &self.state.session_id,
718                    &session_execution_fence,
719                    &self.runtime_lease_owner,
720                    self.host.core.control.lease_timings.ttl_ms(),
721                    64,
722                )
723                .await
724                .map_err(super::runtime_error_from_store_commit)?;
725            if let Some(input_claim) = input_claim {
726                let mut input = input_claim.materialize_for_turn();
727                let turn_id = input
728                    .trace_turn_id
729                    .clone()
730                    .or_else(|| Some(opts.execution_scope_id().to_owned()))
731                    .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
732                input.trace_turn_id = Some(turn_id.clone());
733                crate::trace::emit_trace(
734                    &self.host.core.tracing.trace_sink,
735                    &self.host.core.tracing.trace_context,
736                    lash_trace::TraceContext::default()
737                        .for_session(self.state.session_id.clone())
738                        .for_turn_index(self.state.turn_index + 1)
739                        .for_turn(turn_id.clone()),
740                    lash_trace::TraceEvent::Custom {
741                        name: "turn_input.claimed".to_string(),
742                        payload: serde_json::json!({
743                            "claim_id": &input_claim.claim_id,
744                            "input_ids": input_claim.inputs.iter().map(|input| input.input_id.clone()).collect::<Vec<_>>(),
745                        }),
746                    },
747                    self.host.core.clock.as_ref(),
748                );
749                let claim_for_abandon = input_claim.clone();
750                let scoped_effect_controller = opts.scoped_effect_controller();
751                let result = self
752                    .stream_turn_with_scoped_effect_controller_inner(
753                        input,
754                        opts.events_or_noop(),
755                        opts.turn_events_or_noop(),
756                        scoped_effect_controller,
757                        cancel,
758                        None,
759                        Some(input_claim),
760                        Some(&session_execution_lease),
761                        SessionExecutionLeaseReleasePolicy::FinalCommit,
762                    )
763                    .await
764                    .map(|mut turn| {
765                        stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
766                        Some(turn)
767                    });
768                if let Err(err) = &result {
769                    self.abandon_turn_input_claims_after_lease_loss(
770                        err,
771                        std::slice::from_ref(&claim_for_abandon),
772                    )
773                    .await;
774                }
775                return self
776                    .settle_session_execution_lease(Some(&session_execution_lease), result)
777                    .await;
778            }
779        }
780        let claim = if let Some(batch_ids) = selected_batch_ids {
781            store
782                .claim_ready_queued_work_by_batch_ids(
783                    &self.state.session_id,
784                    &session_execution_fence,
785                    &self.runtime_lease_owner,
786                    crate::QueuedWorkClaimBoundary::Idle,
787                    self.host.core.control.lease_timings.ttl_ms(),
788                    batch_ids,
789                )
790                .await
791        } else {
792            store
793                .claim_ready_queued_work(
794                    &self.state.session_id,
795                    &session_execution_fence,
796                    &self.runtime_lease_owner,
797                    crate::QueuedWorkClaimBoundary::Idle,
798                    self.host.core.control.lease_timings.ttl_ms(),
799                    64,
800                )
801                .await
802        }
803        .map_err(super::runtime_error_from_store_commit)?;
804        let Some(claim) = claim else {
805            session_execution_lease
806                .release_if_live()
807                .await
808                .map_err(|err| {
809                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
810                })?;
811            return Ok(None);
812        };
813        let mut work = claim.materialize_for_turn();
814        let turn_id = work
815            .input
816            .trace_turn_id
817            .clone()
818            .or_else(|| Some(opts.execution_scope_id().to_owned()))
819            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
820        work.input.trace_turn_id = Some(turn_id.clone());
821        let causes = work.turn_causes.clone();
822        emit_queued_work_started_to_sink(
823            opts.turn_events_or_noop(),
824            crate::QueuedWorkClaimBoundary::Idle,
825            &claim,
826            causes.clone(),
827        )
828        .await;
829        crate::trace::emit_trace(
830            &self.host.core.tracing.trace_sink,
831            &self.host.core.tracing.trace_context,
832            lash_trace::TraceContext::default()
833                .for_session(self.state.session_id.clone())
834                .for_turn_index(self.state.turn_index + 1)
835                .for_turn(turn_id.clone()),
836            lash_trace::TraceEvent::Custom {
837                name: "queued_work.claimed".to_string(),
838                payload: queued_work_trace_payload(
839                    crate::QueuedWorkClaimBoundary::Idle,
840                    &claim,
841                    &causes,
842                ),
843            },
844            self.host.core.clock.as_ref(),
845        );
846        let claim_for_abandon = claim.clone();
847        let scoped_effect_controller = opts.scoped_effect_controller();
848        let result = self
849            .stream_turn_with_scoped_effect_controller_inner(
850                work.input,
851                opts.events_or_noop(),
852                opts.turn_events_or_noop(),
853                scoped_effect_controller,
854                cancel,
855                Some(claim),
856                None,
857                Some(&session_execution_lease),
858                SessionExecutionLeaseReleasePolicy::FinalCommit,
859            )
860            .await
861            .map(|mut turn| {
862                stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
863                Some(turn)
864            });
865        if let Err(err) = &result {
866            self.abandon_queued_work_claims_after_lease_loss(
867                err,
868                std::slice::from_ref(&claim_for_abandon),
869            )
870            .await;
871        }
872        self.settle_session_execution_lease(Some(&session_execution_lease), result)
873            .await
874    }
875
876    async fn session_commands_precede_pending_turn_input(
877        &self,
878        store: &dyn crate::RuntimePersistence,
879    ) -> Result<bool, RuntimeError> {
880        let pending_inputs = store
881            .list_pending_turn_inputs(&self.state.session_id)
882            .await
883            .map_err(super::runtime_error_from_store_commit)?;
884        let earliest_input = pending_inputs
885            .iter()
886            .filter(|input| input.state.is_next_turn_pending())
887            .min_by_key(|input| (input.enqueued_at_ms, input.enqueue_seq));
888        let queued_work = store
889            .list_pending_queued_work(&self.state.session_id)
890            .await
891            .map_err(super::runtime_error_from_store_commit)?;
892        let earliest_command = queued_work
893            .iter()
894            .filter(|batch| batch.is_session_command_work())
895            .min_by_key(|batch| (batch.enqueued_at_ms, batch.enqueue_seq));
896        Ok(match (earliest_command, earliest_input) {
897            (Some(command), Some(input)) => command.enqueued_at_ms < input.enqueued_at_ms,
898            (Some(_), None) => true,
899            _ => false,
900        })
901    }
902
903    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
904    /// the host wired a durable effect host, every store reachable from this
905    /// scope must also be durable. A durable host running against any ephemeral
906    /// store fails loudly here rather than silently degrading.
907    ///
908    /// Inline controllers (the default tier) impose no requirement, so
909    /// inline/in-memory hosts pass unchanged.
910    fn ensure_durable_store_facets_for_scope(
911        &self,
912        scoped_effect_controller: &ScopedEffectController<'_>,
913    ) -> Result<(), RuntimeError> {
914        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
915        {
916            return Ok(());
917        }
918        if self
919            .host
920            .core
921            .durability
922            .attachment_store
923            .persistence()
924            .durability_tier()
925            != crate::DurabilityTier::Durable
926        {
927            return Err(RuntimeError::durable_store_required(
928                crate::DurableStoreFacet::AttachmentStore,
929            ));
930        }
931        if self
932            .host
933            .core
934            .durability
935            .process_env_store
936            .durability_tier()
937            != crate::DurabilityTier::Durable
938        {
939            return Err(RuntimeError::durable_store_required(
940                crate::DurableStoreFacet::ProcessEnvStore,
941            ));
942        }
943        if let Some(store) = self
944            .session
945            .as_ref()
946            .and_then(|session| session.history_store())
947            && store.durability_tier() != crate::DurabilityTier::Durable
948        {
949            return Err(RuntimeError::durable_store_required(
950                crate::DurableStoreFacet::SessionStore,
951            ));
952        }
953        if let Some(process_registry) = self.host.process_registry.as_ref()
954            && process_registry.durability_tier() != crate::DurabilityTier::Durable
955        {
956            return Err(RuntimeError::durable_store_required(
957                crate::DurableStoreFacet::ProcessRegistry,
958            ));
959        }
960        Ok(())
961    }
962
963    #[allow(clippy::too_many_arguments)]
964    async fn stream_turn_with_scoped_effect_controller_inner(
965        &mut self,
966        mut input: TurnInput,
967        events: &dyn EventSink,
968        turn_events: &dyn TurnActivitySink,
969        scoped_effect_controller: ScopedEffectController<'_>,
970        cancel: CancellationToken,
971        queued_claim: Option<crate::QueuedWorkClaim>,
972        turn_input_claim: Option<crate::TurnInputClaim>,
973        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
974        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
975    ) -> Result<AssembledTurn, RuntimeError> {
976        if queued_claim.is_none() && turn_input_claim.is_none() {
977            if let Some(lease) = session_execution_lease {
978                while self
979                    .drain_next_session_command(&lease.fence())
980                    .await?
981                    .is_some()
982                {}
983            } else if self
984                .session
985                .as_ref()
986                .and_then(|session| session.history_store())
987                .is_some()
988            {
989                return Err(RuntimeError::new(
990                    RuntimeErrorCode::StoreCommitFailed,
991                    "session command drain requires a session execution lease",
992                ));
993            }
994        }
995        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
996            && scoped_effect_controller
997                .execution_scope()
998                .validates_turn_trace_id()
999            && input_turn_id != scoped_effect_controller.scope_id()
1000        {
1001            return Err(RuntimeError::new(
1002                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
1003                format!(
1004                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
1005                    scoped_effect_controller.scope_id()
1006                ),
1007            ));
1008        }
1009        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
1010        input
1011            .trace_turn_id
1012            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
1013        self.stream_turn_inner(
1014            input.clone(),
1015            events,
1016            turn_events,
1017            scoped_effect_controller,
1018            cancel.clone(),
1019            queued_claim,
1020            turn_input_claim,
1021            session_execution_lease,
1022            session_execution_lease_release_policy,
1023        )
1024        .await
1025    }
1026
1027    /// Stream one logical host turn, following foreground AgentFrame switches
1028    /// until a terminal outcome is reached.
1029    ///
1030    /// A protocol continuation creates a new frame in the same session. Hosts
1031    /// that only care about the benchmark/app answer should not need to
1032    /// special-case that intermediate outcome; this helper keeps driving the
1033    /// same session through each frame's task with the normal runtime turn
1034    /// guards.
1035    pub async fn stream_turn_with_agent_frames(
1036        &mut self,
1037        input: TurnInput,
1038        opts: TurnOptions<'_>,
1039    ) -> Result<AgentFrameRun, RuntimeError> {
1040        let stopwatch = TurnStopwatch::start(self.host.core.clock.as_ref());
1041        let cancel = opts.cancel.clone();
1042        let session_execution_lease = self
1043            .claim_session_execution_lease(cancel.clone(), true)
1044            .await?;
1045        let scoped_effect_controller = opts.scoped_effect_controller();
1046        let result = self
1047            .stream_turn_with_agent_frames_inner(
1048                input,
1049                opts.events_or_noop(),
1050                opts.turn_events_or_noop(),
1051                scoped_effect_controller,
1052                cancel,
1053                session_execution_lease.as_ref(),
1054                stopwatch,
1055            )
1056            .await;
1057        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1058            .await
1059    }
1060
1061    #[allow(clippy::too_many_arguments)]
1062    async fn stream_turn_with_agent_frames_inner(
1063        &mut self,
1064        mut input: TurnInput,
1065        events: &dyn EventSink,
1066        turn_events: &dyn TurnActivitySink,
1067        scoped_effect_controller: ScopedEffectController<'_>,
1068        cancel: CancellationToken,
1069        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1070        stopwatch: TurnStopwatch,
1071    ) -> Result<AgentFrameRun, RuntimeError> {
1072        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
1073            && scoped_effect_controller
1074                .execution_scope()
1075                .validates_turn_trace_id()
1076            && input_turn_id != scoped_effect_controller.scope_id()
1077        {
1078            return Err(RuntimeError::new(
1079                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
1080                format!(
1081                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
1082                    scoped_effect_controller.scope_id()
1083                ),
1084            ));
1085        }
1086        let follow_protocol_turn_options = input.protocol_turn_options.clone();
1087        let follow_turn_context = input.turn_context.clone();
1088        let follow_trace_turn_id = input
1089            .trace_turn_id
1090            .clone()
1091            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
1092        input
1093            .trace_turn_id
1094            .get_or_insert(follow_trace_turn_id.clone());
1095        let mut turns = Vec::new();
1096        loop {
1097            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
1098            input.trace_turn_id = Some(turn_trace_turn_id.clone());
1099            let turn_effect_controller = if turns.is_empty() {
1100                scoped_effect_controller.clone()
1101            } else {
1102                ScopedEffectController::borrowed(
1103                    scoped_effect_controller.controller(),
1104                    ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
1105                )?
1106            };
1107            // The first frame's window opened before the lease claim; each
1108            // follow frame is timed from its own start so per-frame durations
1109            // stay honest.
1110            let frame_stopwatch = if turns.is_empty() {
1111                stopwatch
1112            } else {
1113                TurnStopwatch::start(self.host.core.clock.as_ref())
1114            };
1115            let mut turn = self
1116                .stream_turn_with_scoped_effect_controller_inner(
1117                    input,
1118                    events,
1119                    turn_events,
1120                    turn_effect_controller,
1121                    cancel.clone(),
1122                    None,
1123                    None,
1124                    session_execution_lease,
1125                    SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
1126                )
1127                .await?;
1128            frame_stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
1129            let switched_frame = match &turn.outcome {
1130                TurnOutcome::AgentFrameSwitch { frame_id, task } => {
1131                    Some((frame_id.clone(), task.clone()))
1132                }
1133                _ => None,
1134            };
1135            turns.push(turn);
1136
1137            let Some((_frame_id, task)) = switched_frame else {
1138                return Ok(AgentFrameRun { turns });
1139            };
1140            input = turn_input_from_text(task);
1141            input.protocol_turn_options = follow_protocol_turn_options.clone();
1142            input.turn_context = follow_turn_context.clone();
1143        }
1144    }
1145
1146    #[allow(clippy::too_many_arguments)]
1147    async fn stream_turn_inner(
1148        &mut self,
1149        mut input: TurnInput,
1150        events: &dyn EventSink,
1151        turn_events: &dyn TurnActivitySink,
1152        scoped_effect_controller: ScopedEffectController<'_>,
1153        cancel: CancellationToken,
1154        queued_claim: Option<crate::QueuedWorkClaim>,
1155        turn_input_claim: Option<crate::TurnInputClaim>,
1156        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1157        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1158    ) -> Result<AssembledTurn, RuntimeError> {
1159        self.refresh_session_graph_from_store()
1160            .await
1161            .map_err(session_head_refresh_error)?;
1162        let input_trace_turn_id = input.trace_turn_id.clone();
1163        let queued_turn_work = queued_claim
1164            .as_ref()
1165            .map(crate::QueuedWorkClaim::materialize_for_turn);
1166        let pending_turn_input = turn_input_claim
1167            .as_ref()
1168            .map(crate::TurnInputClaim::materialize_for_turn);
1169        if let Some(work) = pending_turn_input.as_ref()
1170            && input.items.is_empty()
1171            && input.image_blobs.is_empty()
1172        {
1173            input = work.clone();
1174            if input.trace_turn_id.is_none() {
1175                input.trace_turn_id = input_trace_turn_id.clone();
1176            }
1177        }
1178        if let Some(work) = queued_turn_work.as_ref()
1179            && input.items.is_empty()
1180            && input.image_blobs.is_empty()
1181        {
1182            input = work.input.clone();
1183            if input.trace_turn_id.is_none() {
1184                input.trace_turn_id = input_trace_turn_id;
1185            }
1186        }
1187        if self
1188            .session
1189            .as_ref()
1190            .and_then(|session| session.history_store())
1191            .is_some()
1192        {
1193            ensure_durable_effect_input(&input)?;
1194        }
1195        if let Some(extension) = &input.protocol_extension
1196            && let Some(session) = self.session.as_ref()
1197        {
1198            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
1199            protocol_session
1200                .validate_turn_extension(extension)
1201                .await
1202                .map_err(|err| {
1203                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
1204                })?;
1205        }
1206        let previous_prompt_usage = self.state.last_prompt_usage.clone();
1207        let normalized = match self
1208            .normalize_input_items(&input.items, &input.image_blobs)
1209            .await
1210        {
1211            Ok(items) => items,
1212            Err(e) => {
1213                self.state.last_prompt_usage = None;
1214                let mut assembler = TurnAssembler::default();
1215                let error_event = SessionEvent::Error {
1216                    message: e.clone(),
1217                    envelope: Some(crate::session_model::ErrorEnvelope {
1218                        kind: "input_validation".to_string(),
1219                        code: Some("invalid_turn_input".to_string()),
1220                        terminal_reason: None,
1221                        user_message: e.clone(),
1222                        raw: None,
1223                        retryable: Some(false),
1224                        provider_failure_kind: None,
1225                    }),
1226                };
1227                assembler.push(&error_event);
1228                emit_turn_activity_to_sink(
1229                    turn_events,
1230                    TurnActivity::independent(TurnEvent::Error { message: e }),
1231                )
1232                .await;
1233                emit_session_event_to_sink(events, error_event).await;
1234                let outcome_event = SessionEvent::TurnOutcome {
1235                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1236                };
1237                assembler.push(&outcome_event);
1238                emit_session_event_to_sink(events, outcome_event).await;
1239                assembler.push(&SessionEvent::Done);
1240                emit_session_event_to_sink(events, SessionEvent::Done).await;
1241                return Ok(assembler.finish(
1242                    self.state.to_snapshot(),
1243                    false,
1244                    None,
1245                    &self.host.core.control.termination,
1246                ));
1247            }
1248        };
1249        let turn_index = self.state.turn_index + 1;
1250        let trace_turn_id = input
1251            .trace_turn_id
1252            .clone()
1253            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1254        if self.host.core.tracing.trace_sink.is_some() {
1255            let mut trace_metadata = std::collections::BTreeMap::new();
1256            trace_metadata.insert(
1257                "input_item_count".to_string(),
1258                serde_json::json!(normalized.len()),
1259            );
1260            crate::trace::emit_trace(
1261                &self.host.core.tracing.trace_sink,
1262                &self.host.core.tracing.trace_context,
1263                lash_trace::TraceContext::default()
1264                    .for_session(self.state.session_id.clone())
1265                    .for_turn_index(turn_index)
1266                    .for_turn(trace_turn_id.clone()),
1267                lash_trace::TraceEvent::TurnStarted {
1268                    metadata: trace_metadata,
1269                },
1270                self.host.core.clock.as_ref(),
1271            );
1272        }
1273
1274        let base_read_model = self.state.read_model();
1275        let base_messages = base_read_model.messages;
1276        let base_render_cache = base_read_model.prompt_render_cache;
1277        let mut turn_delta = Vec::new();
1278        let initial_turn_causes = queued_turn_work
1279            .as_ref()
1280            .map(|work| work.turn_causes.clone())
1281            .unwrap_or_default();
1282        turn_delta.extend(
1283            initial_turn_causes
1284                .iter()
1285                .map(crate::TurnCause::to_event_message),
1286        );
1287
1288        let user_id = fresh_message_id();
1289        let mut user_parts: Vec<Part> = Vec::new();
1290        for item in normalized {
1291            match item {
1292                NormalizedItem::Text(text) => {
1293                    if text.is_empty() {
1294                        continue;
1295                    }
1296                    user_parts.push(Part {
1297                        id: format!("{}.p{}", user_id, user_parts.len()),
1298                        kind: PartKind::Text,
1299                        content: text,
1300                        attachment: None,
1301                        tool_call_id: None,
1302                        tool_name: None,
1303                        tool_replay: None,
1304                        prune_state: PruneState::Intact,
1305                        reasoning_meta: None,
1306                        response_meta: None,
1307                    });
1308                }
1309                NormalizedItem::Image(reference) => {
1310                    user_parts.push(Part {
1311                        id: format!("{}.p{}", user_id, user_parts.len()),
1312                        kind: PartKind::Image,
1313                        content: String::new(),
1314                        attachment: Some(crate::session_model::message::PartAttachment {
1315                            reference,
1316                        }),
1317                        tool_call_id: None,
1318                        tool_name: None,
1319                        tool_replay: None,
1320                        prune_state: PruneState::Intact,
1321                        reasoning_meta: None,
1322                        response_meta: None,
1323                    });
1324                }
1325            }
1326        }
1327        if user_parts.is_empty() && initial_turn_causes.is_empty() {
1328            user_parts.push(Part {
1329                id: format!("{}.p0", user_id),
1330                kind: PartKind::Text,
1331                content: String::new(),
1332                attachment: None,
1333                tool_call_id: None,
1334                tool_name: None,
1335                tool_replay: None,
1336                prune_state: PruneState::Intact,
1337                reasoning_meta: None,
1338                response_meta: None,
1339            });
1340        }
1341        if !user_parts.is_empty() {
1342            reassign_part_ids(&user_id, &mut user_parts);
1343            turn_delta.push(Message {
1344                id: user_id.clone(),
1345                role: MessageRole::User,
1346                parts: shared_parts(user_parts),
1347                origin: None,
1348            });
1349        }
1350
1351        let manager = self
1352            .runtime_session_services_for_turn(None)
1353            .map_err(|err| {
1354                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1355            })?;
1356        let plugin_session = self
1357            .session
1358            .as_ref()
1359            .map(|s| Arc::clone(s.plugins()))
1360            .ok_or_else(|| {
1361                RuntimeError::new(
1362                    RuntimeErrorCode::ContextPrepareTurn,
1363                    "runtime session not available",
1364                )
1365            })?;
1366        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1367        let prepare_phase_controller = scoped_child_turn_controller(
1368            &scoped_effect_controller,
1369            &self.state.session_id,
1370            &prepare_phase_turn_id,
1371        )?;
1372        let turn_ctx = crate::TurnTransformContext {
1373            session_id: self.state.session_id.clone(),
1374            state: self.read_view(),
1375            prompt_usage: previous_prompt_usage.clone(),
1376            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1377            sessions: manager.state_service(),
1378            session_lifecycle: manager.lifecycle_service(),
1379            session_graph: manager.graph_service(),
1380            scoped_effect_controller: scoped_effect_controller.clone(),
1381            direct_completions: manager.direct_completion_client(
1382                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1383                Some(prepare_phase_turn_id),
1384            ),
1385        };
1386        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1387        let prepared_context = plugin_session
1388            .prepare_turn_context(
1389                &turn_ctx,
1390                crate::session_model::context::PreparedContext {
1391                    messages: crate::MessageSequence::from_base_and_delta(
1392                        base_messages,
1393                        turn_delta,
1394                    )
1395                    .with_base_render_cache(base_render_cache),
1396                    ..Default::default()
1397                },
1398                self.turn_phase_probe.clone(),
1399            )
1400            .await
1401            .map_err(|err| {
1402                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1403            })?;
1404        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1405        // Release the read-view's graph clone before the rest of the turn
1406        // runs. Keeping it alive into `stream_prepared_turn` forces the
1407        // post-turn `append_active_read_delta` to deep-clone the session
1408        // graph (Arc::make_mut with refcount > 1).
1409        drop(turn_ctx);
1410        let messages = prepared_context.messages;
1411        if let Some(session) = self.session.as_mut() {
1412            session
1413                .set_context_overlay(
1414                    prepared_context.tool_providers,
1415                    prepared_context.prompt_contributions,
1416                    prepared_context.include_base_tools,
1417                )
1418                .map_err(|err| {
1419                    RuntimeError::new(
1420                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
1421                        err.to_string(),
1422                    )
1423                })?;
1424        }
1425
1426        self.state.last_prompt_usage = None;
1427        Box::pin(self.stream_prepared_turn_inner(
1428            messages,
1429            previous_prompt_usage,
1430            input.protocol_turn_options.clone(),
1431            input.protocol_extension.clone(),
1432            input.turn_context.clone(),
1433            initial_turn_causes,
1434            trace_turn_id,
1435            turn_index,
1436            events,
1437            turn_events,
1438            scoped_effect_controller,
1439            cancel,
1440            queued_claim,
1441            turn_input_claim,
1442            session_execution_lease,
1443            session_execution_lease_release_policy,
1444        ))
1445        .await
1446    }
1447
1448    /// Run a single turn and return only the assembled terminal result.
1449    pub async fn run_turn_assembled(
1450        &mut self,
1451        input: TurnInput,
1452        cancel: CancellationToken,
1453        scoped_effect_controller: ScopedEffectController<'_>,
1454    ) -> Result<AssembledTurn, RuntimeError> {
1455        self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1456            .await
1457    }
1458
1459    /// Run a turn using host-prepared message history.
1460    #[allow(clippy::too_many_arguments)]
1461    pub async fn stream_prepared_turn(
1462        &mut self,
1463        messages: crate::MessageSequence,
1464        previous_prompt_usage: Option<PromptUsage>,
1465        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1466        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1467        turn_context: crate::TurnContext,
1468        initial_turn_causes: Vec<crate::TurnCause>,
1469        trace_turn_id: String,
1470        turn_index: usize,
1471        events: &dyn EventSink,
1472        turn_events: &dyn TurnActivitySink,
1473        scoped_effect_controller: ScopedEffectController<'_>,
1474        cancel: CancellationToken,
1475        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1476        initial_turn_input_claim: Option<crate::TurnInputClaim>,
1477    ) -> Result<AssembledTurn, RuntimeError> {
1478        let stopwatch = TurnStopwatch::start(self.host.core.clock.as_ref());
1479        let session_execution_lease = self
1480            .claim_session_execution_lease(cancel.clone(), true)
1481            .await?;
1482        let result = self
1483            .stream_prepared_turn_inner(
1484                messages,
1485                previous_prompt_usage,
1486                protocol_turn_options,
1487                protocol_extension,
1488                turn_context,
1489                initial_turn_causes,
1490                trace_turn_id,
1491                turn_index,
1492                events,
1493                turn_events,
1494                scoped_effect_controller,
1495                cancel,
1496                initial_queue_claim,
1497                initial_turn_input_claim,
1498                session_execution_lease.as_ref(),
1499                SessionExecutionLeaseReleasePolicy::FinalCommit,
1500            )
1501            .await
1502            .map(|mut turn| {
1503                stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
1504                turn
1505            });
1506        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1507            .await
1508    }
1509
1510    #[allow(clippy::too_many_arguments)]
1511    async fn stream_prepared_turn_inner(
1512        &mut self,
1513        messages: crate::MessageSequence,
1514        _previous_prompt_usage: Option<PromptUsage>,
1515        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1516        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1517        turn_context: crate::TurnContext,
1518        initial_turn_causes: Vec<crate::TurnCause>,
1519        trace_turn_id: String,
1520        turn_index: usize,
1521        events: &dyn EventSink,
1522        turn_events: &dyn TurnActivitySink,
1523        scoped_effect_controller: ScopedEffectController<'_>,
1524        cancel: CancellationToken,
1525        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1526        initial_turn_input_claim: Option<crate::TurnInputClaim>,
1527        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1528        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1529    ) -> Result<AssembledTurn, RuntimeError> {
1530        if session_execution_lease.is_none()
1531            && self
1532                .session
1533                .as_ref()
1534                .and_then(|session| session.history_store())
1535                .is_some()
1536        {
1537            return Err(RuntimeError::new(
1538                RuntimeErrorCode::StoreCommitFailed,
1539                "prepared turn requires a session execution lease",
1540            ));
1541        }
1542        let session_execution_fence =
1543            session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1544        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1545        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1546        let mut turn_policy = self.state.effective_policy().clone();
1547        let turn_provider_override = turn_context.provider().cloned();
1548        if let Some(provider) = turn_provider_override.as_ref() {
1549            turn_policy.provider_id = provider.kind().to_string();
1550        }
1551        if let Some(model) = turn_context.model_spec() {
1552            turn_policy.model = model.clone();
1553        }
1554        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1555        let effective_protocol_turn_options = protocol_turn_options
1556            .clone()
1557            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1558            .unwrap_or(session_protocol_turn_options);
1559        let manager = self
1560            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1561            .map_err(|err| {
1562                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1563            })?;
1564        let plugins = {
1565            let session = self
1566                .session
1567                .as_ref()
1568                .expect("lash runtime session must be available");
1569            Arc::clone(session.plugins())
1570        };
1571        let mut assembler = TurnAssembler::new();
1572        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1573        // Block-scope the pinned future so it (and its captured
1574        // `SessionReadView` clone of the session graph) drops before the
1575        // post-turn `append_active_read_delta` mutation. Keeping it alive
1576        // across the turn forces `Arc::make_mut` to deep-clone
1577        // `SessionGraphData`.
1578        let prepared = {
1579            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1580                PrepareTurnRequest {
1581                    session_id: self.state.session_id.clone(),
1582                    state: crate::SessionReadView::from_runtime_state(
1583                        &self.state,
1584                        turn_policy.clone(),
1585                        effective_protocol_turn_options.clone(),
1586                    ),
1587                    messages,
1588                    sessions: manager.state_service(),
1589                    session_lifecycle: manager.lifecycle_service(),
1590                    session_graph: manager.graph_service(),
1591                    turn_context: turn_context.clone(),
1592                },
1593                self.turn_phase_probe.clone(),
1594            );
1595            let mut prepare_turn = Box::pin(prepare_turn);
1596
1597            loop {
1598                tokio::select! {
1599                    prepared = prepare_turn.as_mut() => {
1600                        let prepared = prepared.map_err(|err| {
1601                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1602                        })?;
1603                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1604                        break prepared;
1605                    }
1606                    maybe_event = event_rx.recv() => {
1607                        if let Some(event) = maybe_event {
1608                            emit_runtime_stream_event_to_sinks(
1609                                events,
1610                                turn_events,
1611                                event,
1612                                &mut assembler,
1613                            )
1614                            .await;
1615                        }
1616                    }
1617                }
1618            }
1619        };
1620        for event in &prepared.events {
1621            assembler.push(event);
1622        }
1623        emit_session_events_to_sink(events, prepared.events).await;
1624        if let Some(abort) = prepared.abort {
1625            drop(event_tx);
1626
1627            let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1628                self.state.clone(),
1629                Arc::clone(&self.host.core.clock),
1630            );
1631            turn_pipeline.apply_prepared_messages(&prepared.messages);
1632            let state = turn_pipeline.into_final_state();
1633            let issue = TurnIssue {
1634                kind: "plugin".to_string(),
1635                code: Some(abort.code),
1636                terminal_reason: None,
1637                message: abort.message.clone(),
1638                raw: None,
1639                retryable: None,
1640                provider_failure_kind: None,
1641            };
1642            let error_event = SessionEvent::Error {
1643                message: abort.message,
1644                envelope: Some(crate::session_model::ErrorEnvelope {
1645                    kind: "plugin".to_string(),
1646                    code: issue.code.clone(),
1647                    terminal_reason: None,
1648                    user_message: issue.message.clone(),
1649                    raw: None,
1650                    retryable: None,
1651                    provider_failure_kind: None,
1652                }),
1653            };
1654            assembler.push(&error_event);
1655            emit_turn_activity_to_sink(
1656                turn_events,
1657                TurnActivity::independent(TurnEvent::Error {
1658                    message: issue.message.clone(),
1659                }),
1660            )
1661            .await;
1662            emit_session_event_to_sink(events, error_event).await;
1663            let outcome_event = SessionEvent::TurnOutcome {
1664                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1665            };
1666            assembler.push(&outcome_event);
1667            emit_session_event_to_sink(events, outcome_event).await;
1668            assembler.push(&SessionEvent::Done);
1669            emit_session_event_to_sink(events, SessionEvent::Done).await;
1670            return Ok(assembler.finish(
1671                state.to_snapshot(),
1672                cancel.is_cancelled(),
1673                Some(issue),
1674                &self.host.core.control.termination,
1675            ));
1676        }
1677        let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1678            self.state.clone(),
1679            Arc::clone(&self.host.core.clock),
1680        )
1681        .with_session_execution_lease(session_execution_fence.clone());
1682        let store = self
1683            .session
1684            .as_ref()
1685            .and_then(|session| session.history_store());
1686        // Durable controllers, like Restate, own in-flight replay. Writing
1687        // progress checkpoints directly to the shared store would make handler
1688        // replay observe a newer partial turn and change effect replay keys.
1689        let progress_store = if scoped_effect_controller.controller().durability_tier()
1690            == crate::DurabilityTier::Durable
1691        {
1692            None
1693        } else {
1694            store.as_ref().map(|store| store.as_ref())
1695        };
1696        turn_pipeline
1697            .prepared_checkpoint(
1698                progress_store,
1699                turn_policy.clone(),
1700                turn_index,
1701                &prepared.messages,
1702                self.session.as_mut(),
1703            )
1704            .await
1705            .map_err(super::runtime_error_from_store_commit)?;
1706        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1707            RuntimeSessionPolicy::from_provider(
1708                turn_policy.clone(),
1709                provider.with_clock(Arc::clone(&self.host.core.clock)),
1710            )
1711            .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1712        } else {
1713            self.host
1714                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1715                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1716        };
1717        let manager = self
1718            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1719            .map_err(|err| {
1720                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1721            })?;
1722        let cancel_state = cancel.clone();
1723        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1724        let session = self
1725            .session
1726            .take()
1727            .expect("lash runtime session must be available");
1728        let mut driver = RuntimeTurnDriver {
1729            session,
1730            policy: resolved_turn_policy,
1731            host: self.host.clone(),
1732            turn_id: scoped_effect_controller.scope_id().to_string(),
1733            scoped_effect_controller,
1734            session_id: self.state.session_id.clone(),
1735            turn_index,
1736            turn_pipeline,
1737            llm_stream_summaries: HashMap::new(),
1738            next_llm_ordinal: 0,
1739            session_services: manager,
1740            protocol_turn_options: effective_protocol_turn_options,
1741            protocol_extension,
1742            turn_context,
1743            turn_causes: initial_turn_causes,
1744            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1745            pending_turn_input_claims: initial_turn_input_claim.into_iter().collect(),
1746            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1747            session_execution_lease: session_execution_fence,
1748            runtime_lease_owner: self.runtime_lease_owner.clone(),
1749            turn_phase_probe: self.turn_phase_probe.clone(),
1750        };
1751        let protocol_run_offset = 0;
1752        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1753        let run_result = drive_turn_to_completion(
1754            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1755            &mut event_rx,
1756            &mut assembler,
1757            &child_usage_event_relay,
1758            events,
1759            turn_events,
1760        )
1761        .await;
1762        let (new_messages, _new_protocol_iteration) = match run_result {
1763            Ok(result) => result,
1764            Err(err) => {
1765                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1766                let RuntimeTurnDriver {
1767                    session,
1768                    pending_queue_claims,
1769                    pending_turn_input_claims,
1770                    ..
1771                } = driver;
1772                self.session = Some(session);
1773                self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1774                    .await;
1775                self.abandon_turn_input_claims_after_lease_loss(&err, &pending_turn_input_claims)
1776                    .await;
1777                return Err(err);
1778            }
1779        };
1780        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1781        tracing::debug!(
1782            new_message_count = new_messages.len(),
1783            tool_call_count = assembler.tool_calls.len(),
1784            "runtime post-run_task"
1785        );
1786
1787        let RuntimeTurnDriver {
1788            session,
1789            policy,
1790            turn_pipeline,
1791            pending_queue_claims,
1792            pending_turn_input_claims,
1793            ..
1794        } = driver;
1795        self.session = Some(session);
1796        let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1797        let pending_turn_input_claims_for_abandon = pending_turn_input_claims.clone();
1798        let finish_result = self
1799            .finish_turn(
1800                TurnFinishInput {
1801                    turn_pipeline,
1802                    assembler,
1803                    new_messages,
1804                    policy,
1805                    turn_index,
1806                    queued_work_completions: pending_queue_claims
1807                        .iter()
1808                        .map(crate::QueuedWorkClaim::completion)
1809                        .collect(),
1810                    turn_input_completions: pending_turn_input_claims
1811                        .iter()
1812                        .map(crate::TurnInputClaim::completion)
1813                        .collect(),
1814                    trace_turn_id,
1815                },
1816                events,
1817                &finish_scoped_effect_controller,
1818                &cancel_state,
1819                session_execution_lease,
1820                session_execution_lease_release_policy,
1821            )
1822            .await;
1823        if let Err(err) = &finish_result {
1824            self.abandon_queued_work_claims_after_lease_loss(
1825                err,
1826                &pending_queue_claims_for_abandon,
1827            )
1828            .await;
1829            self.abandon_turn_input_claims_after_lease_loss(
1830                err,
1831                &pending_turn_input_claims_for_abandon,
1832            )
1833            .await;
1834        }
1835        finish_result
1836    }
1837    async fn normalize_input_items(
1838        &self,
1839        items: &[InputItem],
1840        image_blobs: &HashMap<String, Vec<u8>>,
1841    ) -> Result<Vec<NormalizedItem>, String> {
1842        normalize_input_items(
1843            items,
1844            image_blobs,
1845            self.host.core.durability.attachment_store.as_ref(),
1846        )
1847        .await
1848    }
1849}
1850
1851fn turn_input_from_text(text: String) -> TurnInput {
1852    TurnInput {
1853        items: vec![InputItem::Text { text }],
1854        image_blobs: HashMap::new(),
1855        protocol_turn_options: None,
1856        trace_turn_id: None,
1857        protocol_extension: None,
1858        turn_context: crate::TurnContext::default(),
1859    }
1860}
1861
1862fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1863    if completed_turn_count == 0 {
1864        root_turn_id.to_string()
1865    } else {
1866        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1867    }
1868}
1869
1870pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1871    if input.protocol_extension.is_some() {
1872        return Err(RuntimeError::new(
1873            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1874            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1875        ));
1876    }
1877    input
1878        .turn_context
1879        .live_plugin_inputs()
1880        .durable_effect_rejection()?;
1881    Ok(())
1882}
1883
1884async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1885    if !events.is_noop() {
1886        events.emit(activity).await;
1887    }
1888}
1889
1890/// Pump the turn driver's event channel into the host sinks while the run
1891/// future executes, then drain any events emitted between completion and the
1892/// sender dropping.
1893///
1894/// Both the fresh and resumed turn entry points construct a
1895/// `RuntimeTurnDriver`, kick off its run future, and need identical
1896/// event-pump/drain behavior before tearing the driver down. Only the driver
1897/// construction and post-run teardown differ, so each caller owns those and
1898/// shares this loop.
1899async fn drive_turn_to_completion<F>(
1900    run_future: F,
1901    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1902    assembler: &mut TurnAssembler,
1903    child_usage_event_relay: &ChildUsageEventRelay,
1904    events: &dyn EventSink,
1905    turn_events: &dyn TurnActivitySink,
1906) -> Result<(crate::MessageSequence, usize), RuntimeError>
1907where
1908    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1909{
1910    let run_result = {
1911        let mut run_future = Box::pin(run_future);
1912        loop {
1913            tokio::select! {
1914                maybe_event = event_rx.recv() => {
1915                    if let Some(event) = maybe_event {
1916                        emit_runtime_stream_event_to_sinks(
1917                            events,
1918                            turn_events,
1919                            event,
1920                            assembler,
1921                        )
1922                        .await;
1923                    }
1924                }
1925                completed = run_future.as_mut() => {
1926                    child_usage_event_relay.clear();
1927                    break completed;
1928                }
1929            }
1930        }
1931    };
1932    while let Some(event) = event_rx.recv().await {
1933        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1934    }
1935    run_result
1936}
1937
1938async fn emit_runtime_stream_event_to_sinks(
1939    events: &dyn EventSink,
1940    turn_events: &dyn TurnActivitySink,
1941    event: RuntimeStreamEvent,
1942    assembler: &mut TurnAssembler,
1943) {
1944    match event {
1945        RuntimeStreamEvent::Session(event) => {
1946            assembler.push(&event);
1947            emit_session_event_to_sink(events, event).await;
1948        }
1949        RuntimeStreamEvent::Turn(activity) => {
1950            emit_turn_activity_to_sink(turn_events, activity).await;
1951        }
1952    }
1953}
1954
1955#[cfg(test)]
1956mod tests {
1957    use super::agent_frame_follow_turn_id;
1958
1959    #[test]
1960    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1961        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1962        assert_eq!(
1963            agent_frame_follow_turn_id("root-turn", 1),
1964            "root-turn:agent-frame:1"
1965        );
1966        assert_eq!(
1967            agent_frame_follow_turn_id("root-turn", 2),
1968            "root-turn:agent-frame:2"
1969        );
1970    }
1971}