Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15            ("completed", "submitted_value", None)
16        }
17        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18        TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19            "completed",
20            "agent_frame_switch",
21            Some(lash_trace::TraceAgentFrameSwitch {
22                frame_id: frame_id.clone(),
23            }),
24        ),
25        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26    }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30    match stop {
31        TurnStop::Cancelled => "cancelled",
32        TurnStop::Incomplete => "incomplete",
33        TurnStop::InvalidInput => "invalid_input",
34        TurnStop::MaxTurns => "max_turns",
35        TurnStop::ToolFailure => "tool_failure",
36        TurnStop::ProviderError => "provider_error",
37        TurnStop::PluginAbort => "plugin_abort",
38        TurnStop::RuntimeError => "runtime_error",
39        TurnStop::SubmittedError { .. } => "submitted_error",
40        TurnStop::ToolError { .. } => "tool_error",
41    }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45    RuntimeError::new(
46        RuntimeErrorCode::Other("session_head_refresh".to_string()),
47        err.to_string(),
48    )
49}
50
51#[derive(Clone, Copy)]
52enum SessionExecutionLeaseReleasePolicy {
53    FinalCommit,
54    KeepOnAgentFrameSwitch,
55}
56
57impl SessionExecutionLeaseReleasePolicy {
58    fn should_release(self, outcome: &TurnOutcome) -> bool {
59        match self {
60            Self::FinalCommit => true,
61            Self::KeepOnAgentFrameSwitch => {
62                !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
63            }
64        }
65    }
66}
67
68fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
69    match payload {
70        crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
71        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
72        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
73    }
74}
75
76fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
77    claim
78        .batches
79        .iter()
80        .map(|batch| batch.batch_id.clone())
81        .collect()
82}
83
84fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
85    format!("{parent_turn_id}:{phase}")
86}
87
88fn scoped_child_turn_controller<'run>(
89    scoped_effect_controller: &'run ScopedEffectController<'_>,
90    session_id: &str,
91    turn_id: &str,
92) -> Result<ScopedEffectController<'run>, RuntimeError> {
93    ScopedEffectController::borrowed(
94        scoped_effect_controller.controller(),
95        ExecutionScope::turn(session_id, turn_id),
96    )
97}
98
99pub(in crate::runtime) fn queued_work_trace_payload(
100    boundary: crate::QueuedWorkClaimBoundary,
101    claim: &crate::QueuedWorkClaim,
102    causes: &[crate::TurnCause],
103) -> serde_json::Value {
104    serde_json::json!({
105        "boundary": boundary,
106        "claim_id": claim.claim_id,
107        "owner_id": claim.owner_id,
108        "batch_ids": queued_work_batch_ids(claim),
109        "payload_types": claim.batches.iter()
110            .flat_map(|batch| batch.items.iter())
111            .map(|item| queued_work_payload_type(&item.payload))
112            .collect::<Vec<_>>(),
113        "causes": causes,
114    })
115}
116
117pub(in crate::runtime) fn queued_work_completion_trace_payload(
118    completions: &[crate::QueuedWorkCompletion],
119) -> serde_json::Value {
120    serde_json::json!({
121        "claims": completions.iter().map(|completion| {
122            serde_json::json!({
123                "session_id": completion.session_id,
124                "claim_id": completion.claim_id,
125                "batch_ids": completion.batch_ids,
126            })
127        }).collect::<Vec<_>>(),
128    })
129}
130
131async fn emit_queued_work_started_to_sink(
132    events: &dyn TurnActivitySink,
133    boundary: crate::QueuedWorkClaimBoundary,
134    claim: &crate::QueuedWorkClaim,
135    causes: Vec<crate::TurnCause>,
136) {
137    emit_turn_activity_to_sink(
138        events,
139        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
140            boundary,
141            batch_ids: queued_work_batch_ids(claim),
142            causes,
143        }),
144    )
145    .await;
146}
147
148pub(in crate::runtime) async fn send_queued_work_started_event(
149    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
150    boundary: crate::QueuedWorkClaimBoundary,
151    claim: &crate::QueuedWorkClaim,
152    causes: Vec<crate::TurnCause>,
153) {
154    send_turn_activity(
155        event_tx,
156        TurnActivityId::fresh(),
157        TurnEvent::QueuedWorkStarted {
158            boundary,
159            batch_ids: queued_work_batch_ids(claim),
160            causes,
161        },
162    )
163    .await;
164}
165
166struct TurnFinishInput {
167    turn_pipeline: TurnBoundary,
168    assembler: TurnAssembler,
169    new_messages: crate::MessageSequence,
170    policy: RuntimeSessionPolicy,
171    turn_index: usize,
172    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
173    trace_turn_id: String,
174}
175
176impl LashRuntime {
177    fn max_context_tokens(&self) -> usize {
178        self.state.effective_policy().context_window_tokens()
179    }
180
181    async fn claim_session_execution_lease(
182        &self,
183        cancel: CancellationToken,
184        busy_is_error: bool,
185    ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
186        let Some(store) = self
187            .session
188            .as_ref()
189            .and_then(|session| session.history_store())
190        else {
191            return Ok(None);
192        };
193        match SessionExecutionLeaseGuard::try_acquire(
194            store,
195            &self.state.session_id,
196            &self.runtime_scope_id,
197            Arc::clone(&self.host.core.clock),
198            cancel,
199        )
200        .await
201        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
202        {
203            Some(lease) => Ok(Some(lease)),
204            None if busy_is_error => Err(RuntimeError::new(
205                RuntimeErrorCode::SessionExecutionBusy,
206                format!(
207                    "session `{}` is already executing on another runtime owner",
208                    self.state.session_id
209                ),
210            )),
211            None => Ok(None),
212        }
213    }
214
215    async fn settle_session_execution_lease<T>(
216        &self,
217        guard: Option<&SessionExecutionLeaseGuard>,
218        result: Result<T, RuntimeError>,
219    ) -> Result<T, RuntimeError> {
220        match result {
221            Ok(value) => {
222                if let Some(guard) = guard {
223                    guard.release_if_live().await.map_err(|err| {
224                        RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
225                    })?;
226                }
227                Ok(value)
228            }
229            Err(err) => {
230                if err.code != RuntimeErrorCode::StoreCommitFailed
231                    && let Some(guard) = guard
232                    && let Err(release_err) = guard.release_if_live().await
233                {
234                    tracing::warn!(
235                        error = %release_err,
236                        "failed to release session execution lease after runtime error"
237                    );
238                }
239                Err(err)
240            }
241        }
242    }
243
244    async fn ensure_session_execution_lease_live(
245        &self,
246        guard: Option<&SessionExecutionLeaseGuard>,
247    ) -> Result<(), RuntimeError> {
248        let Some(guard) = guard else {
249            return Ok(());
250        };
251        guard.refresh_or_mark_lost().await.map_err(|err| {
252            RuntimeError::new(
253                RuntimeErrorCode::SessionExecutionLeaseLost,
254                format!(
255                    "session execution lease for session `{}` was lost before commit: {err}",
256                    self.state.session_id
257                ),
258            )
259        })
260    }
261
262    async fn abandon_queued_work_claims_after_lease_loss(
263        &self,
264        err: &RuntimeError,
265        claims: &[crate::QueuedWorkClaim],
266    ) {
267        if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
268            return;
269        }
270        let Some(store) = self
271            .session
272            .as_ref()
273            .and_then(|session| session.history_store())
274        else {
275            return;
276        };
277        for claim in claims {
278            if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
279                tracing::warn!(
280                    error = %abandon_err,
281                    session_id = %claim.session_id,
282                    claim_id = %claim.claim_id,
283                    "failed to abandon queued work claim after session execution lease loss"
284                );
285            }
286        }
287    }
288
289    #[doc(hidden)]
290    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
291        self.turn_phase_probe = Some(probe);
292    }
293
294    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
295        if let Some(probe) = self.turn_phase_probe.as_ref() {
296            probe.begin(phase);
297        }
298    }
299
300    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
301        if let Some(probe) = self.turn_phase_probe.as_ref() {
302            probe.end(phase);
303        }
304    }
305
306    async fn finish_turn(
307        &mut self,
308        finish: TurnFinishInput,
309        events: &dyn EventSink,
310        scoped_effect_controller: &ScopedEffectController<'_>,
311        cancel_state: &CancellationToken,
312        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
313        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
314    ) -> Result<AssembledTurn, RuntimeError> {
315        let TurnFinishInput {
316            mut turn_pipeline,
317            assembler,
318            new_messages,
319            policy,
320            turn_index,
321            queued_work_completions,
322            trace_turn_id,
323        } = finish;
324        self.policy = self.state.effective_policy().clone();
325        turn_pipeline.state_mut().policy = self.policy.clone();
326        turn_pipeline.state_mut().turn_index = turn_index;
327
328        let mut turn_usage_delta = {
329            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
330            std::mem::take(&mut *ledger)
331        };
332        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
333            turn_usage_delta.push(TokenLedgerEntry {
334                source: "turn".to_string(),
335                model: policy.model.id.clone(),
336                usage: assembler.token_usage.clone(),
337            });
338        }
339        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
340
341        turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
342        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
343            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
344        }
345
346        let last_prompt_usage = assembler
347            .last_llm_usage()
348            .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
349        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
350        let assembled_state = turn_pipeline.export_state_for_assembly();
351        let assembled = assembler.finish(
352            assembled_state,
353            cancel_state.is_cancelled(),
354            None,
355            &self.host.core.control.termination,
356        );
357
358        let Some(session) = self.session.as_ref() else {
359            self.state.apply_snapshot(&assembled.state);
360            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
361            return Ok(assembled);
362        };
363        self.ensure_session_execution_lease_live(session_execution_lease)
364            .await?;
365
366        let plugins = Arc::clone(session.plugins());
367        let manager = match self.runtime_session_services_for_turn(None) {
368            Ok(manager) => manager,
369            Err(err) => {
370                return Err(RuntimeError::new(
371                    RuntimeErrorCode::PluginSessionManager,
372                    err.to_string(),
373                ));
374            }
375        };
376
377        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
378        let finalized = match plugins
379            .finalize_turn_with_phase_probe(
380                assembled,
381                manager.state_service(),
382                manager.lifecycle_service(),
383                manager.graph_service(),
384                self.turn_phase_probe.clone(),
385            )
386            .await
387        {
388            Ok(finalized) => finalized,
389            Err(err) => {
390                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
391                return Err(RuntimeError::new(
392                    RuntimeErrorCode::PluginFinalizeTurn,
393                    err.to_string(),
394                ));
395            }
396        };
397        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
398        self.ensure_session_execution_lease_live(session_execution_lease)
399            .await?;
400
401        let mut returned_turn = finalized.turn;
402        let release_session_execution_lease =
403            session_execution_lease_release_policy.should_release(&returned_turn.outcome);
404        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
405        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
406        let queued_work_completion_trace = queued_work_completions.clone();
407        let pending_attachment_ids = self
408            .host
409            .core
410            .durability
411            .attachment_store
412            .pending_manifest_commit_ids();
413        if let Err(err) = turn_pipeline
414            .final_commit(
415                &mut returned_turn,
416                self.session.as_mut(),
417                &turn_usage_delta,
418                Some(&trace_turn_id),
419                queued_work_completions,
420                pending_attachment_ids.clone(),
421                release_session_execution_lease
422                    .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
423                    .flatten(),
424            )
425            .await
426        {
427            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
428            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
429            return Err(err);
430        }
431        if release_session_execution_lease && let Some(lease) = session_execution_lease {
432            lease.mark_released();
433        }
434        self.host
435            .core
436            .durability
437            .attachment_store
438            .mark_manifest_committed(&pending_attachment_ids);
439        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
440
441        emit_session_events_to_sink(events, finalized.events).await;
442        self.state = turn_pipeline.into_final_state();
443        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
444            && let Some(session) = self.session.as_mut()
445        {
446            let protocol_session = Arc::clone(session.plugins().protocol_session());
447            let session_id = self.state.session_id.clone();
448            protocol_session
449                .restore_session(
450                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
451                    &self.state,
452                )
453                .await
454                .map_err(|err| {
455                    RuntimeError::new(
456                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
457                        err.to_string(),
458                    )
459                })?;
460        }
461        if !queued_work_completion_trace.is_empty() {
462            crate::trace::emit_trace(
463                &self.host.core.tracing.trace_sink,
464                &self.host.core.tracing.trace_context,
465                lash_trace::TraceContext::default()
466                    .for_session(returned_turn.state.session_id.clone())
467                    .for_turn_index(returned_turn.state.turn_index)
468                    .for_turn(trace_turn_id.clone()),
469                lash_trace::TraceEvent::Custom {
470                    name: "queued_work.completed".to_string(),
471                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
472                },
473                self.host.core.clock.as_ref(),
474            );
475        }
476        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
477        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
478            .await?;
479        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
480        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
481
482        self.emit_completed_turn_trace(
483            &returned_turn.state,
484            &returned_turn.outcome,
485            &trace_turn_id,
486        );
487        Ok(returned_turn)
488    }
489
490    fn emit_completed_turn_trace(
491        &self,
492        state: &SessionSnapshot,
493        outcome: &TurnOutcome,
494        trace_turn_id: &str,
495    ) {
496        if self.host.core.tracing.trace_sink.is_none() {
497            return;
498        }
499
500        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
501        crate::trace::emit_trace(
502            &self.host.core.tracing.trace_sink,
503            &self.host.core.tracing.trace_context,
504            lash_trace::TraceContext::default()
505                .for_session(state.session_id.clone())
506                .for_turn_index(state.turn_index)
507                .for_turn(trace_turn_id.to_string()),
508            lash_trace::TraceEvent::TurnCompleted {
509                status: status.to_string(),
510                done_reason: done_reason.to_string(),
511                agent_frame_switch,
512            },
513            self.host.core.clock.as_ref(),
514        );
515    }
516
517    async fn emit_turn_persisted_event(
518        &self,
519        returned_turn: &AssembledTurn,
520        scoped_effect_controller: &ScopedEffectController<'_>,
521        trace_turn_id: &str,
522    ) -> Result<(), RuntimeError> {
523        let Some(session) = self.session.as_ref() else {
524            return Ok(());
525        };
526        let Ok(manager) = self.runtime_session_services() else {
527            return Ok(());
528        };
529        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
530        let phase_controller = scoped_child_turn_controller(
531            scoped_effect_controller,
532            &self.state.session_id,
533            &phase_turn_id,
534        )?;
535        let direct_completions = manager.direct_completion_client(
536            RuntimeEffectControllerHandle::borrowed(phase_controller),
537            Some(phase_turn_id),
538        );
539
540        session
541            .plugins()
542            .emit_runtime_event_with_phase_probe(
543                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
544                    crate::SessionStateChangedContext {
545                        session_id: self.state.session_id.clone(),
546                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
547                        sessions: manager.state_service(),
548                        session_graph: manager.graph_service(),
549                        direct_completions,
550                    },
551                )),
552                self.turn_phase_probe.clone(),
553            )
554            .await;
555        Ok(())
556    }
557
558    /// Run a single turn and stream events to the host sink.
559    pub async fn stream_turn(
560        &mut self,
561        input: TurnInput,
562        opts: TurnOptions<'_>,
563    ) -> Result<AssembledTurn, RuntimeError> {
564        let cancel = opts.cancel.clone();
565        let session_execution_lease = self
566            .claim_session_execution_lease(cancel.clone(), true)
567            .await?;
568        let scoped_effect_controller = opts.scoped_effect_controller();
569        let result = self
570            .stream_turn_with_scoped_effect_controller_inner(
571                input,
572                opts.events_or_noop(),
573                opts.turn_events_or_noop(),
574                scoped_effect_controller,
575                cancel,
576                None,
577                session_execution_lease.as_ref(),
578                SessionExecutionLeaseReleasePolicy::FinalCommit,
579            )
580            .await;
581        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
582            .await
583    }
584
585    pub async fn stream_next_queued_work(
586        &mut self,
587        opts: TurnOptions<'_>,
588    ) -> Result<Option<AssembledTurn>, RuntimeError> {
589        self.stream_queued_work(opts, None).await
590    }
591
592    pub async fn stream_selected_queued_work(
593        &mut self,
594        opts: TurnOptions<'_>,
595        batch_ids: &[String],
596    ) -> Result<Option<AssembledTurn>, RuntimeError> {
597        self.stream_queued_work(opts, Some(batch_ids)).await
598    }
599
600    async fn stream_queued_work(
601        &mut self,
602        opts: TurnOptions<'_>,
603        selected_batch_ids: Option<&[String]>,
604    ) -> Result<Option<AssembledTurn>, RuntimeError> {
605        let cancel = opts.cancel.clone();
606        let Some(session_execution_lease) = self
607            .claim_session_execution_lease(cancel.clone(), false)
608            .await?
609        else {
610            return Ok(None);
611        };
612        let session_execution_fence = session_execution_lease.fence();
613        if self
614            .drain_next_session_command(&session_execution_fence)
615            .await?
616            .is_some()
617        {
618            session_execution_lease
619                .release_if_live()
620                .await
621                .map_err(|err| {
622                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
623                })?;
624            return Ok(None);
625        }
626        let Some(store) = self
627            .session
628            .as_ref()
629            .and_then(|session| session.history_store())
630        else {
631            return Ok(None);
632        };
633        let claim = if let Some(batch_ids) = selected_batch_ids {
634            store
635                .claim_ready_queued_work_by_batch_ids(
636                    &self.state.session_id,
637                    &session_execution_fence,
638                    &self.runtime_scope_id,
639                    crate::QueuedWorkClaimBoundary::Idle,
640                    crate::QUEUED_WORK_CLAIM_TTL_MS,
641                    batch_ids,
642                )
643                .await
644        } else {
645            store
646                .claim_ready_queued_work(
647                    &self.state.session_id,
648                    &session_execution_fence,
649                    &self.runtime_scope_id,
650                    crate::QueuedWorkClaimBoundary::Idle,
651                    crate::QUEUED_WORK_CLAIM_TTL_MS,
652                    64,
653                )
654                .await
655        }
656        .map_err(super::runtime_error_from_store_commit)?;
657        let Some(claim) = claim else {
658            session_execution_lease
659                .release_if_live()
660                .await
661                .map_err(|err| {
662                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
663                })?;
664            return Ok(None);
665        };
666        let mut work = claim.materialize_for_turn();
667        let turn_id = work
668            .input
669            .trace_turn_id
670            .clone()
671            .or_else(|| Some(opts.execution_scope_id().to_owned()))
672            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
673        work.input.trace_turn_id = Some(turn_id.clone());
674        let causes = work.turn_causes.clone();
675        emit_queued_work_started_to_sink(
676            opts.turn_events_or_noop(),
677            crate::QueuedWorkClaimBoundary::Idle,
678            &claim,
679            causes.clone(),
680        )
681        .await;
682        crate::trace::emit_trace(
683            &self.host.core.tracing.trace_sink,
684            &self.host.core.tracing.trace_context,
685            lash_trace::TraceContext::default()
686                .for_session(self.state.session_id.clone())
687                .for_turn_index(self.state.turn_index + 1)
688                .for_turn(turn_id.clone()),
689            lash_trace::TraceEvent::Custom {
690                name: "queued_work.claimed".to_string(),
691                payload: queued_work_trace_payload(
692                    crate::QueuedWorkClaimBoundary::Idle,
693                    &claim,
694                    &causes,
695                ),
696            },
697            self.host.core.clock.as_ref(),
698        );
699        let claim_for_abandon = claim.clone();
700        let scoped_effect_controller = opts.scoped_effect_controller();
701        let result = self
702            .stream_turn_with_scoped_effect_controller_inner(
703                work.input,
704                opts.events_or_noop(),
705                opts.turn_events_or_noop(),
706                scoped_effect_controller,
707                cancel,
708                Some(claim),
709                Some(&session_execution_lease),
710                SessionExecutionLeaseReleasePolicy::FinalCommit,
711            )
712            .await
713            .map(Some);
714        if let Err(err) = &result {
715            self.abandon_queued_work_claims_after_lease_loss(
716                err,
717                std::slice::from_ref(&claim_for_abandon),
718            )
719            .await;
720        }
721        self.settle_session_execution_lease(Some(&session_execution_lease), result)
722            .await
723    }
724
725    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
726    /// the host wired a durable effect host, every store reachable from this
727    /// scope must also be durable. A durable host running against any ephemeral
728    /// store fails loudly here rather than silently degrading.
729    ///
730    /// Inline controllers (the default tier) impose no requirement, so
731    /// inline/in-memory hosts pass unchanged.
732    fn ensure_durable_store_facets_for_scope(
733        &self,
734        scoped_effect_controller: &ScopedEffectController<'_>,
735    ) -> Result<(), RuntimeError> {
736        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
737        {
738            return Ok(());
739        }
740        if self
741            .host
742            .core
743            .durability
744            .attachment_store
745            .persistence()
746            .durability_tier()
747            != crate::DurabilityTier::Durable
748        {
749            return Err(RuntimeError::durable_store_required(
750                crate::DurableStoreFacet::AttachmentStore,
751            ));
752        }
753        if self
754            .host
755            .core
756            .durability
757            .process_env_store
758            .durability_tier()
759            != crate::DurabilityTier::Durable
760        {
761            return Err(RuntimeError::durable_store_required(
762                crate::DurableStoreFacet::ProcessEnvStore,
763            ));
764        }
765        if let Some(store) = self
766            .session
767            .as_ref()
768            .and_then(|session| session.history_store())
769            && store.durability_tier() != crate::DurabilityTier::Durable
770        {
771            return Err(RuntimeError::durable_store_required(
772                crate::DurableStoreFacet::SessionStore,
773            ));
774        }
775        if let Some(process_registry) = self.host.process_registry.as_ref()
776            && process_registry.durability_tier() != crate::DurabilityTier::Durable
777        {
778            return Err(RuntimeError::durable_store_required(
779                crate::DurableStoreFacet::ProcessRegistry,
780            ));
781        }
782        Ok(())
783    }
784
785    #[allow(clippy::too_many_arguments)]
786    async fn stream_turn_with_scoped_effect_controller_inner(
787        &mut self,
788        mut input: TurnInput,
789        events: &dyn EventSink,
790        turn_events: &dyn TurnActivitySink,
791        scoped_effect_controller: ScopedEffectController<'_>,
792        cancel: CancellationToken,
793        queued_claim: Option<crate::QueuedWorkClaim>,
794        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
795        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
796    ) -> Result<AssembledTurn, RuntimeError> {
797        if queued_claim.is_none() {
798            if let Some(lease) = session_execution_lease {
799                while self
800                    .drain_next_session_command(&lease.fence())
801                    .await?
802                    .is_some()
803                {}
804            } else if self
805                .session
806                .as_ref()
807                .and_then(|session| session.history_store())
808                .is_some()
809            {
810                return Err(RuntimeError::new(
811                    RuntimeErrorCode::StoreCommitFailed,
812                    "session command drain requires a session execution lease",
813                ));
814            }
815        }
816        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
817            && scoped_effect_controller
818                .execution_scope()
819                .validates_turn_trace_id()
820            && input_turn_id != scoped_effect_controller.scope_id()
821        {
822            return Err(RuntimeError::new(
823                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
824                format!(
825                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
826                    scoped_effect_controller.scope_id()
827                ),
828            ));
829        }
830        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
831        input
832            .trace_turn_id
833            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
834        self.stream_turn_inner(
835            input.clone(),
836            events,
837            turn_events,
838            scoped_effect_controller,
839            cancel.clone(),
840            queued_claim,
841            session_execution_lease,
842            session_execution_lease_release_policy,
843        )
844        .await
845    }
846
847    /// Stream one logical host turn, following foreground AgentFrame switches
848    /// until a terminal outcome is reached.
849    ///
850    /// A protocol continuation creates a new frame in the same session. Hosts
851    /// that only care about the benchmark/app answer should not need to
852    /// special-case that intermediate outcome; this helper keeps driving the
853    /// same session through each frame's task with the normal runtime turn
854    /// guards.
855    pub async fn stream_turn_with_agent_frames(
856        &mut self,
857        input: TurnInput,
858        opts: TurnOptions<'_>,
859    ) -> Result<AgentFrameRun, RuntimeError> {
860        let cancel = opts.cancel.clone();
861        let session_execution_lease = self
862            .claim_session_execution_lease(cancel.clone(), true)
863            .await?;
864        let scoped_effect_controller = opts.scoped_effect_controller();
865        let result = self
866            .stream_turn_with_agent_frames_inner(
867                input,
868                opts.events_or_noop(),
869                opts.turn_events_or_noop(),
870                scoped_effect_controller,
871                cancel,
872                session_execution_lease.as_ref(),
873            )
874            .await;
875        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
876            .await
877    }
878
879    async fn stream_turn_with_agent_frames_inner(
880        &mut self,
881        mut input: TurnInput,
882        events: &dyn EventSink,
883        turn_events: &dyn TurnActivitySink,
884        scoped_effect_controller: ScopedEffectController<'_>,
885        cancel: CancellationToken,
886        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
887    ) -> Result<AgentFrameRun, RuntimeError> {
888        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
889            && scoped_effect_controller
890                .execution_scope()
891                .validates_turn_trace_id()
892            && input_turn_id != scoped_effect_controller.scope_id()
893        {
894            return Err(RuntimeError::new(
895                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
896                format!(
897                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
898                    scoped_effect_controller.scope_id()
899                ),
900            ));
901        }
902        let follow_protocol_turn_options = input.protocol_turn_options.clone();
903        let follow_turn_context = input.turn_context.clone();
904        let follow_trace_turn_id = input
905            .trace_turn_id
906            .clone()
907            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
908        input
909            .trace_turn_id
910            .get_or_insert(follow_trace_turn_id.clone());
911        let mut turns = Vec::new();
912        loop {
913            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
914            input.trace_turn_id = Some(turn_trace_turn_id.clone());
915            let turn_effect_controller = if turns.is_empty() {
916                scoped_effect_controller.clone()
917            } else {
918                ScopedEffectController::borrowed(
919                    scoped_effect_controller.controller(),
920                    ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
921                )?
922            };
923            let turn = self
924                .stream_turn_with_scoped_effect_controller_inner(
925                    input,
926                    events,
927                    turn_events,
928                    turn_effect_controller,
929                    cancel.clone(),
930                    None,
931                    session_execution_lease,
932                    SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
933                )
934                .await?;
935            let switched_frame = match &turn.outcome {
936                TurnOutcome::AgentFrameSwitch { frame_id, task } => {
937                    Some((frame_id.clone(), task.clone()))
938                }
939                _ => None,
940            };
941            turns.push(turn);
942
943            let Some((_frame_id, task)) = switched_frame else {
944                return Ok(AgentFrameRun { turns });
945            };
946            input = turn_input_from_text(task);
947            input.protocol_turn_options = follow_protocol_turn_options.clone();
948            input.turn_context = follow_turn_context.clone();
949        }
950    }
951
952    #[allow(clippy::too_many_arguments)]
953    async fn stream_turn_inner(
954        &mut self,
955        mut input: TurnInput,
956        events: &dyn EventSink,
957        turn_events: &dyn TurnActivitySink,
958        scoped_effect_controller: ScopedEffectController<'_>,
959        cancel: CancellationToken,
960        queued_claim: Option<crate::QueuedWorkClaim>,
961        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
962        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
963    ) -> Result<AssembledTurn, RuntimeError> {
964        self.refresh_session_graph_from_store()
965            .await
966            .map_err(session_head_refresh_error)?;
967        let input_trace_turn_id = input.trace_turn_id.clone();
968        let queued_turn_work = queued_claim
969            .as_ref()
970            .map(crate::QueuedWorkClaim::materialize_for_turn);
971        if let Some(work) = queued_turn_work.as_ref()
972            && input.items.is_empty()
973            && input.image_blobs.is_empty()
974        {
975            input = work.input.clone();
976            if input.trace_turn_id.is_none() {
977                input.trace_turn_id = input_trace_turn_id;
978            }
979        }
980        if self
981            .session
982            .as_ref()
983            .and_then(|session| session.history_store())
984            .is_some()
985        {
986            ensure_durable_effect_input(&input)?;
987        }
988        if let Some(extension) = &input.protocol_extension
989            && let Some(session) = self.session.as_ref()
990        {
991            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
992            protocol_session
993                .validate_turn_extension(extension)
994                .await
995                .map_err(|err| {
996                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
997                })?;
998        }
999        let previous_prompt_usage = self.state.last_prompt_usage.clone();
1000        let normalized = match self
1001            .normalize_input_items(&input.items, &input.image_blobs)
1002            .await
1003        {
1004            Ok(items) => items,
1005            Err(e) => {
1006                self.state.last_prompt_usage = None;
1007                let mut assembler = TurnAssembler::default();
1008                let error_event = SessionEvent::Error {
1009                    message: e.clone(),
1010                    envelope: Some(crate::session_model::ErrorEnvelope {
1011                        kind: "input_validation".to_string(),
1012                        code: Some("invalid_turn_input".to_string()),
1013                        terminal_reason: None,
1014                        user_message: e.clone(),
1015                        raw: None,
1016                    }),
1017                };
1018                assembler.push(&error_event);
1019                emit_turn_activity_to_sink(
1020                    turn_events,
1021                    TurnActivity::independent(TurnEvent::Error { message: e }),
1022                )
1023                .await;
1024                emit_session_event_to_sink(events, error_event).await;
1025                let outcome_event = SessionEvent::TurnOutcome {
1026                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1027                };
1028                assembler.push(&outcome_event);
1029                emit_session_event_to_sink(events, outcome_event).await;
1030                assembler.push(&SessionEvent::Done);
1031                emit_session_event_to_sink(events, SessionEvent::Done).await;
1032                return Ok(assembler.finish(
1033                    self.state.to_snapshot(),
1034                    false,
1035                    None,
1036                    &self.host.core.control.termination,
1037                ));
1038            }
1039        };
1040        let turn_index = self.state.turn_index + 1;
1041        let trace_turn_id = input
1042            .trace_turn_id
1043            .clone()
1044            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1045        if self.host.core.tracing.trace_sink.is_some() {
1046            let mut trace_metadata = std::collections::BTreeMap::new();
1047            trace_metadata.insert(
1048                "input_item_count".to_string(),
1049                serde_json::json!(normalized.len()),
1050            );
1051            crate::trace::emit_trace(
1052                &self.host.core.tracing.trace_sink,
1053                &self.host.core.tracing.trace_context,
1054                lash_trace::TraceContext::default()
1055                    .for_session(self.state.session_id.clone())
1056                    .for_turn_index(turn_index)
1057                    .for_turn(trace_turn_id.clone()),
1058                lash_trace::TraceEvent::TurnStarted {
1059                    metadata: trace_metadata,
1060                },
1061                self.host.core.clock.as_ref(),
1062            );
1063        }
1064
1065        let base_read_model = self.state.read_model();
1066        let base_messages = base_read_model.messages;
1067        let base_render_cache = base_read_model.prompt_render_cache;
1068        let mut turn_delta = Vec::new();
1069        let initial_turn_causes = queued_turn_work
1070            .as_ref()
1071            .map(|work| work.turn_causes.clone())
1072            .unwrap_or_default();
1073        turn_delta.extend(
1074            initial_turn_causes
1075                .iter()
1076                .map(crate::TurnCause::to_event_message),
1077        );
1078
1079        let user_id = fresh_message_id();
1080        let mut user_parts: Vec<Part> = Vec::new();
1081        for item in normalized {
1082            match item {
1083                NormalizedItem::Text(text) => {
1084                    if text.is_empty() {
1085                        continue;
1086                    }
1087                    user_parts.push(Part {
1088                        id: format!("{}.p{}", user_id, user_parts.len()),
1089                        kind: PartKind::Text,
1090                        content: text,
1091                        attachment: None,
1092                        tool_call_id: None,
1093                        tool_name: None,
1094                        tool_replay: None,
1095                        prune_state: PruneState::Intact,
1096                        reasoning_meta: None,
1097                        response_meta: None,
1098                    });
1099                }
1100                NormalizedItem::Image(reference) => {
1101                    user_parts.push(Part {
1102                        id: format!("{}.p{}", user_id, user_parts.len()),
1103                        kind: PartKind::Image,
1104                        content: String::new(),
1105                        attachment: Some(crate::session_model::message::PartAttachment {
1106                            reference,
1107                        }),
1108                        tool_call_id: None,
1109                        tool_name: None,
1110                        tool_replay: None,
1111                        prune_state: PruneState::Intact,
1112                        reasoning_meta: None,
1113                        response_meta: None,
1114                    });
1115                }
1116            }
1117        }
1118        if user_parts.is_empty() && initial_turn_causes.is_empty() {
1119            user_parts.push(Part {
1120                id: format!("{}.p0", user_id),
1121                kind: PartKind::Text,
1122                content: String::new(),
1123                attachment: None,
1124                tool_call_id: None,
1125                tool_name: None,
1126                tool_replay: None,
1127                prune_state: PruneState::Intact,
1128                reasoning_meta: None,
1129                response_meta: None,
1130            });
1131        }
1132        if !user_parts.is_empty() {
1133            reassign_part_ids(&user_id, &mut user_parts);
1134            turn_delta.push(Message {
1135                id: user_id.clone(),
1136                role: MessageRole::User,
1137                parts: shared_parts(user_parts),
1138                origin: None,
1139            });
1140        }
1141
1142        let manager = self
1143            .runtime_session_services_for_turn(None)
1144            .map_err(|err| {
1145                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1146            })?;
1147        let plugin_session = self
1148            .session
1149            .as_ref()
1150            .map(|s| Arc::clone(s.plugins()))
1151            .ok_or_else(|| {
1152                RuntimeError::new(
1153                    RuntimeErrorCode::ContextPrepareTurn,
1154                    "runtime session not available",
1155                )
1156            })?;
1157        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1158        let prepare_phase_controller = scoped_child_turn_controller(
1159            &scoped_effect_controller,
1160            &self.state.session_id,
1161            &prepare_phase_turn_id,
1162        )?;
1163        let turn_ctx = crate::TurnTransformContext {
1164            session_id: self.state.session_id.clone(),
1165            state: self.read_view(),
1166            prompt_usage: previous_prompt_usage.clone(),
1167            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1168            sessions: manager.state_service(),
1169            session_lifecycle: manager.lifecycle_service(),
1170            session_graph: manager.graph_service(),
1171            scoped_effect_controller: scoped_effect_controller.clone(),
1172            direct_completions: manager.direct_completion_client(
1173                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1174                Some(prepare_phase_turn_id),
1175            ),
1176        };
1177        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1178        let prepared_context = plugin_session
1179            .prepare_turn_context(
1180                &turn_ctx,
1181                crate::session_model::context::PreparedContext {
1182                    messages: crate::MessageSequence::from_base_and_delta(
1183                        base_messages,
1184                        turn_delta,
1185                    )
1186                    .with_base_render_cache(base_render_cache),
1187                    ..Default::default()
1188                },
1189                self.turn_phase_probe.clone(),
1190            )
1191            .await
1192            .map_err(|err| {
1193                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1194            })?;
1195        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1196        // Release the read-view's graph clone before the rest of the turn
1197        // runs. Keeping it alive into `stream_prepared_turn` forces the
1198        // post-turn `append_active_read_delta` to deep-clone the session
1199        // graph (Arc::make_mut with refcount > 1).
1200        drop(turn_ctx);
1201        let messages = prepared_context.messages;
1202        if let Some(session) = self.session.as_mut() {
1203            session
1204                .set_context_overlay(
1205                    prepared_context.tool_providers,
1206                    prepared_context.prompt_contributions,
1207                    prepared_context.include_base_tools,
1208                )
1209                .map_err(|err| {
1210                    RuntimeError::new(
1211                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
1212                        err.to_string(),
1213                    )
1214                })?;
1215        }
1216
1217        self.state.last_prompt_usage = None;
1218        Box::pin(self.stream_prepared_turn_inner(
1219            messages,
1220            previous_prompt_usage,
1221            input.protocol_turn_options.clone(),
1222            input.protocol_extension.clone(),
1223            input.turn_context.clone(),
1224            initial_turn_causes,
1225            trace_turn_id,
1226            turn_index,
1227            events,
1228            turn_events,
1229            scoped_effect_controller,
1230            cancel,
1231            queued_claim,
1232            session_execution_lease,
1233            session_execution_lease_release_policy,
1234        ))
1235        .await
1236    }
1237
1238    /// Run a single turn and return only the assembled terminal result.
1239    pub async fn run_turn_assembled(
1240        &mut self,
1241        input: TurnInput,
1242        cancel: CancellationToken,
1243        scoped_effect_controller: ScopedEffectController<'_>,
1244    ) -> Result<AssembledTurn, RuntimeError> {
1245        self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1246            .await
1247    }
1248
1249    /// Run a turn using host-prepared message history.
1250    #[allow(clippy::too_many_arguments)]
1251    pub async fn stream_prepared_turn(
1252        &mut self,
1253        messages: crate::MessageSequence,
1254        previous_prompt_usage: Option<PromptUsage>,
1255        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1256        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1257        turn_context: crate::TurnContext,
1258        initial_turn_causes: Vec<crate::TurnCause>,
1259        trace_turn_id: String,
1260        turn_index: usize,
1261        events: &dyn EventSink,
1262        turn_events: &dyn TurnActivitySink,
1263        scoped_effect_controller: ScopedEffectController<'_>,
1264        cancel: CancellationToken,
1265        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1266    ) -> Result<AssembledTurn, RuntimeError> {
1267        let session_execution_lease = self
1268            .claim_session_execution_lease(cancel.clone(), true)
1269            .await?;
1270        let result = self
1271            .stream_prepared_turn_inner(
1272                messages,
1273                previous_prompt_usage,
1274                protocol_turn_options,
1275                protocol_extension,
1276                turn_context,
1277                initial_turn_causes,
1278                trace_turn_id,
1279                turn_index,
1280                events,
1281                turn_events,
1282                scoped_effect_controller,
1283                cancel,
1284                initial_queue_claim,
1285                session_execution_lease.as_ref(),
1286                SessionExecutionLeaseReleasePolicy::FinalCommit,
1287            )
1288            .await;
1289        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1290            .await
1291    }
1292
1293    #[allow(clippy::too_many_arguments)]
1294    async fn stream_prepared_turn_inner(
1295        &mut self,
1296        messages: crate::MessageSequence,
1297        _previous_prompt_usage: Option<PromptUsage>,
1298        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1299        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1300        turn_context: crate::TurnContext,
1301        initial_turn_causes: Vec<crate::TurnCause>,
1302        trace_turn_id: String,
1303        turn_index: usize,
1304        events: &dyn EventSink,
1305        turn_events: &dyn TurnActivitySink,
1306        scoped_effect_controller: ScopedEffectController<'_>,
1307        cancel: CancellationToken,
1308        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1309        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1310        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1311    ) -> Result<AssembledTurn, RuntimeError> {
1312        if session_execution_lease.is_none()
1313            && self
1314                .session
1315                .as_ref()
1316                .and_then(|session| session.history_store())
1317                .is_some()
1318        {
1319            return Err(RuntimeError::new(
1320                RuntimeErrorCode::StoreCommitFailed,
1321                "prepared turn requires a session execution lease",
1322            ));
1323        }
1324        let session_execution_fence =
1325            session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1326        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1327        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1328        let mut turn_policy = self.state.effective_policy().clone();
1329        let turn_provider_override = turn_context.provider().cloned();
1330        if let Some(provider) = turn_provider_override.as_ref() {
1331            turn_policy.provider_id = provider.kind().to_string();
1332        }
1333        if let Some(model) = turn_context.model_spec() {
1334            turn_policy.model = model.clone();
1335        }
1336        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1337        let effective_protocol_turn_options = protocol_turn_options
1338            .clone()
1339            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1340            .unwrap_or(session_protocol_turn_options);
1341        let manager = self
1342            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1343            .map_err(|err| {
1344                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1345            })?;
1346        let plugins = {
1347            let session = self
1348                .session
1349                .as_ref()
1350                .expect("lash runtime session must be available");
1351            Arc::clone(session.plugins())
1352        };
1353        let mut assembler = TurnAssembler::new();
1354        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1355        // Block-scope the pinned future so it (and its captured
1356        // `SessionReadView` clone of the session graph) drops before the
1357        // post-turn `append_active_read_delta` mutation. Keeping it alive
1358        // across the turn forces `Arc::make_mut` to deep-clone
1359        // `SessionGraphData`.
1360        let prepared = {
1361            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1362                PrepareTurnRequest {
1363                    session_id: self.state.session_id.clone(),
1364                    state: crate::SessionReadView::from_runtime_state(
1365                        &self.state,
1366                        turn_policy.clone(),
1367                        effective_protocol_turn_options.clone(),
1368                    ),
1369                    messages,
1370                    sessions: manager.state_service(),
1371                    session_lifecycle: manager.lifecycle_service(),
1372                    session_graph: manager.graph_service(),
1373                    turn_context: turn_context.clone(),
1374                },
1375                self.turn_phase_probe.clone(),
1376            );
1377            let mut prepare_turn = Box::pin(prepare_turn);
1378
1379            loop {
1380                tokio::select! {
1381                    prepared = prepare_turn.as_mut() => {
1382                        let prepared = prepared.map_err(|err| {
1383                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1384                        })?;
1385                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1386                        break prepared;
1387                    }
1388                    maybe_event = event_rx.recv() => {
1389                        if let Some(event) = maybe_event {
1390                            emit_runtime_stream_event_to_sinks(
1391                                events,
1392                                turn_events,
1393                                event,
1394                                &mut assembler,
1395                            )
1396                            .await;
1397                        }
1398                    }
1399                }
1400            }
1401        };
1402        for event in &prepared.events {
1403            assembler.push(event);
1404        }
1405        emit_session_events_to_sink(events, prepared.events).await;
1406        if let Some(abort) = prepared.abort {
1407            drop(event_tx);
1408
1409            let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1410                self.state.clone(),
1411                Arc::clone(&self.host.core.clock),
1412            );
1413            turn_pipeline.apply_prepared_messages(&prepared.messages);
1414            let state = turn_pipeline.into_final_state();
1415            let issue = TurnIssue {
1416                kind: "plugin".to_string(),
1417                code: Some(abort.code),
1418                terminal_reason: None,
1419                message: abort.message.clone(),
1420                raw: None,
1421            };
1422            let error_event = SessionEvent::Error {
1423                message: abort.message,
1424                envelope: Some(crate::session_model::ErrorEnvelope {
1425                    kind: "plugin".to_string(),
1426                    code: issue.code.clone(),
1427                    terminal_reason: None,
1428                    user_message: issue.message.clone(),
1429                    raw: None,
1430                }),
1431            };
1432            assembler.push(&error_event);
1433            emit_turn_activity_to_sink(
1434                turn_events,
1435                TurnActivity::independent(TurnEvent::Error {
1436                    message: issue.message.clone(),
1437                }),
1438            )
1439            .await;
1440            emit_session_event_to_sink(events, error_event).await;
1441            let outcome_event = SessionEvent::TurnOutcome {
1442                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1443            };
1444            assembler.push(&outcome_event);
1445            emit_session_event_to_sink(events, outcome_event).await;
1446            assembler.push(&SessionEvent::Done);
1447            emit_session_event_to_sink(events, SessionEvent::Done).await;
1448            return Ok(assembler.finish(
1449                state.to_snapshot(),
1450                cancel.is_cancelled(),
1451                Some(issue),
1452                &self.host.core.control.termination,
1453            ));
1454        }
1455        let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1456            self.state.clone(),
1457            Arc::clone(&self.host.core.clock),
1458        )
1459        .with_session_execution_lease(session_execution_fence.clone());
1460        let store = self
1461            .session
1462            .as_ref()
1463            .and_then(|session| session.history_store());
1464        // Durable controllers, like Restate, own in-flight replay. Writing
1465        // progress checkpoints directly to the shared store would make handler
1466        // replay observe a newer partial turn and change effect replay keys.
1467        let progress_store = if scoped_effect_controller.controller().durability_tier()
1468            == crate::DurabilityTier::Durable
1469        {
1470            None
1471        } else {
1472            store.as_ref().map(|store| store.as_ref())
1473        };
1474        turn_pipeline
1475            .prepared_checkpoint(
1476                progress_store,
1477                turn_policy.clone(),
1478                turn_index,
1479                &prepared.messages,
1480                self.session.as_mut(),
1481            )
1482            .await
1483            .map_err(super::runtime_error_from_store_commit)?;
1484        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1485            RuntimeSessionPolicy::from_provider(
1486                turn_policy.clone(),
1487                provider.with_clock(Arc::clone(&self.host.core.clock)),
1488            )
1489            .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1490        } else {
1491            self.host
1492                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1493                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1494        };
1495        let manager = self
1496            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1497            .map_err(|err| {
1498                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1499            })?;
1500        let cancel_state = cancel.clone();
1501        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1502        let session = self
1503            .session
1504            .take()
1505            .expect("lash runtime session must be available");
1506        let mut driver = RuntimeTurnDriver {
1507            session,
1508            policy: resolved_turn_policy,
1509            host: self.host.clone(),
1510            turn_id: scoped_effect_controller.scope_id().to_string(),
1511            scoped_effect_controller,
1512            session_id: self.state.session_id.clone(),
1513            turn_index,
1514            turn_pipeline,
1515            llm_stream_summaries: HashMap::new(),
1516            next_llm_ordinal: 0,
1517            session_services: manager,
1518            protocol_turn_options: effective_protocol_turn_options,
1519            protocol_extension,
1520            turn_context,
1521            turn_causes: initial_turn_causes,
1522            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1523            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1524            session_execution_lease: session_execution_fence,
1525            turn_phase_probe: self.turn_phase_probe.clone(),
1526        };
1527        let protocol_run_offset = 0;
1528        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1529        let run_result = drive_turn_to_completion(
1530            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1531            &mut event_rx,
1532            &mut assembler,
1533            &child_usage_event_relay,
1534            events,
1535            turn_events,
1536        )
1537        .await;
1538        let (new_messages, _new_protocol_iteration) = match run_result {
1539            Ok(result) => result,
1540            Err(err) => {
1541                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1542                let RuntimeTurnDriver {
1543                    session,
1544                    pending_queue_claims,
1545                    ..
1546                } = driver;
1547                self.session = Some(session);
1548                self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1549                    .await;
1550                return Err(err);
1551            }
1552        };
1553        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1554        tracing::debug!(
1555            new_message_count = new_messages.len(),
1556            tool_call_count = assembler.tool_calls.len(),
1557            "runtime post-run_task"
1558        );
1559
1560        let RuntimeTurnDriver {
1561            session,
1562            policy,
1563            turn_pipeline,
1564            pending_queue_claims,
1565            ..
1566        } = driver;
1567        self.session = Some(session);
1568        let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1569        let finish_result = self
1570            .finish_turn(
1571                TurnFinishInput {
1572                    turn_pipeline,
1573                    assembler,
1574                    new_messages,
1575                    policy,
1576                    turn_index,
1577                    queued_work_completions: pending_queue_claims
1578                        .iter()
1579                        .map(crate::QueuedWorkClaim::completion)
1580                        .collect(),
1581                    trace_turn_id,
1582                },
1583                events,
1584                &finish_scoped_effect_controller,
1585                &cancel_state,
1586                session_execution_lease,
1587                session_execution_lease_release_policy,
1588            )
1589            .await;
1590        if let Err(err) = &finish_result {
1591            self.abandon_queued_work_claims_after_lease_loss(
1592                err,
1593                &pending_queue_claims_for_abandon,
1594            )
1595            .await;
1596        }
1597        finish_result
1598    }
1599    async fn normalize_input_items(
1600        &self,
1601        items: &[InputItem],
1602        image_blobs: &HashMap<String, Vec<u8>>,
1603    ) -> Result<Vec<NormalizedItem>, String> {
1604        normalize_input_items(
1605            items,
1606            image_blobs,
1607            self.host.core.durability.attachment_store.as_ref(),
1608        )
1609        .await
1610    }
1611}
1612
1613fn turn_input_from_text(text: String) -> TurnInput {
1614    TurnInput {
1615        items: vec![InputItem::Text { text }],
1616        image_blobs: HashMap::new(),
1617        protocol_turn_options: None,
1618        trace_turn_id: None,
1619        protocol_extension: None,
1620        turn_context: crate::TurnContext::default(),
1621    }
1622}
1623
1624fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1625    if completed_turn_count == 0 {
1626        root_turn_id.to_string()
1627    } else {
1628        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1629    }
1630}
1631
1632pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1633    if input.protocol_extension.is_some() {
1634        return Err(RuntimeError::new(
1635            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1636            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1637        ));
1638    }
1639    input
1640        .turn_context
1641        .live_plugin_inputs()
1642        .durable_effect_rejection()?;
1643    Ok(())
1644}
1645
1646async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1647    if !events.is_noop() {
1648        events.emit(activity).await;
1649    }
1650}
1651
1652/// Pump the turn driver's event channel into the host sinks while the run
1653/// future executes, then drain any events emitted between completion and the
1654/// sender dropping.
1655///
1656/// Both the fresh and resumed turn entry points construct a
1657/// `RuntimeTurnDriver`, kick off its run future, and need identical
1658/// event-pump/drain behavior before tearing the driver down. Only the driver
1659/// construction and post-run teardown differ, so each caller owns those and
1660/// shares this loop.
1661async fn drive_turn_to_completion<F>(
1662    run_future: F,
1663    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1664    assembler: &mut TurnAssembler,
1665    child_usage_event_relay: &ChildUsageEventRelay,
1666    events: &dyn EventSink,
1667    turn_events: &dyn TurnActivitySink,
1668) -> Result<(crate::MessageSequence, usize), RuntimeError>
1669where
1670    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1671{
1672    let run_result = {
1673        let mut run_future = Box::pin(run_future);
1674        loop {
1675            tokio::select! {
1676                maybe_event = event_rx.recv() => {
1677                    if let Some(event) = maybe_event {
1678                        emit_runtime_stream_event_to_sinks(
1679                            events,
1680                            turn_events,
1681                            event,
1682                            assembler,
1683                        )
1684                        .await;
1685                    }
1686                }
1687                completed = run_future.as_mut() => {
1688                    child_usage_event_relay.clear();
1689                    break completed;
1690                }
1691            }
1692        }
1693    };
1694    while let Some(event) = event_rx.recv().await {
1695        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1696    }
1697    run_result
1698}
1699
1700async fn emit_runtime_stream_event_to_sinks(
1701    events: &dyn EventSink,
1702    turn_events: &dyn TurnActivitySink,
1703    event: RuntimeStreamEvent,
1704    assembler: &mut TurnAssembler,
1705) {
1706    match event {
1707        RuntimeStreamEvent::Session(event) => {
1708            assembler.push(&event);
1709            emit_session_event_to_sink(events, event).await;
1710        }
1711        RuntimeStreamEvent::Turn(activity) => {
1712            assembler.push_turn_activity(&activity);
1713            emit_turn_activity_to_sink(turn_events, activity).await;
1714        }
1715    }
1716}
1717
1718#[cfg(test)]
1719mod tests {
1720    use super::agent_frame_follow_turn_id;
1721
1722    #[test]
1723    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1724        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1725        assert_eq!(
1726            agent_frame_follow_turn_id("root-turn", 1),
1727            "root-turn:agent-frame:1"
1728        );
1729        assert_eq!(
1730            agent_frame_follow_turn_id("root-turn", 2),
1731            "root-turn:agent-frame:2"
1732        );
1733    }
1734}