Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::FinalValue { .. }) => ("completed", "final_value", None),
15        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
16        TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
17            "completed",
18            "agent_frame_switch",
19            Some(lash_trace::TraceAgentFrameSwitch {
20                frame_id: frame_id.clone(),
21            }),
22        ),
23        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
24    }
25}
26
27fn trace_stop_reason(stop: &TurnStop) -> &'static str {
28    match stop {
29        TurnStop::Cancelled => "cancelled",
30        TurnStop::Incomplete => "incomplete",
31        TurnStop::InvalidInput => "invalid_input",
32        TurnStop::MaxTurns => "max_turns",
33        TurnStop::ToolFailure => "tool_failure",
34        TurnStop::ProviderError => "provider_error",
35        TurnStop::PluginAbort => "plugin_abort",
36        TurnStop::RuntimeError => "runtime_error",
37        TurnStop::SubmittedError { .. } => "submitted_error",
38        TurnStop::ToolError { .. } => "tool_error",
39    }
40}
41
42fn session_head_refresh_error(err: SessionError) -> RuntimeError {
43    RuntimeError::new(
44        RuntimeErrorCode::Other("session_head_refresh".to_string()),
45        err.to_string(),
46    )
47}
48
49#[derive(Clone, Copy)]
50enum SessionExecutionLeaseReleasePolicy {
51    FinalCommit,
52    KeepOnAgentFrameSwitch,
53}
54
55impl SessionExecutionLeaseReleasePolicy {
56    fn should_release(self, outcome: &TurnOutcome) -> bool {
57        match self {
58            Self::FinalCommit => true,
59            Self::KeepOnAgentFrameSwitch => {
60                !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
61            }
62        }
63    }
64}
65
66fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
67    match payload {
68        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
69        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
70    }
71}
72
73fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
74    claim
75        .batches
76        .iter()
77        .map(|batch| batch.batch_id.clone())
78        .collect()
79}
80
81fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
82    format!("{parent_turn_id}:{phase}")
83}
84
85fn scoped_child_turn_controller<'run>(
86    scoped_effect_controller: &'run ScopedEffectController<'_>,
87    session_id: &str,
88    turn_id: &str,
89) -> Result<ScopedEffectController<'run>, RuntimeError> {
90    ScopedEffectController::borrowed(
91        scoped_effect_controller.controller(),
92        ExecutionScope::turn(session_id, turn_id),
93    )
94}
95
96pub(in crate::runtime) fn queued_work_trace_payload(
97    boundary: crate::QueuedWorkClaimBoundary,
98    claim: &crate::QueuedWorkClaim,
99    causes: &[crate::TurnCause],
100) -> serde_json::Value {
101    serde_json::json!({
102        "boundary": boundary,
103        "claim_id": claim.claim_id,
104        "owner_id": claim.owner.owner_id,
105        "incarnation_id": claim.owner.incarnation_id,
106        "batch_ids": queued_work_batch_ids(claim),
107        "payload_types": claim.batches.iter()
108            .flat_map(|batch| batch.items.iter())
109            .map(|item| queued_work_payload_type(&item.payload))
110            .collect::<Vec<_>>(),
111        "causes": causes,
112    })
113}
114
115pub(in crate::runtime) fn queued_work_completion_trace_payload(
116    completions: &[crate::QueuedWorkCompletion],
117) -> serde_json::Value {
118    serde_json::json!({
119        "claims": completions.iter().map(|completion| {
120            serde_json::json!({
121                "session_id": completion.session_id,
122                "claim_id": completion.claim_id,
123                "batch_ids": completion.batch_ids,
124            })
125        }).collect::<Vec<_>>(),
126    })
127}
128
129async fn emit_queued_work_started_to_sink(
130    events: &dyn TurnActivitySink,
131    boundary: crate::QueuedWorkClaimBoundary,
132    claim: &crate::QueuedWorkClaim,
133    causes: Vec<crate::TurnCause>,
134) {
135    emit_turn_activity_to_sink(
136        events,
137        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
138            boundary,
139            batch_ids: queued_work_batch_ids(claim),
140            causes,
141        }),
142    )
143    .await;
144}
145
146pub(in crate::runtime) async fn send_queued_work_started_event(
147    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
148    boundary: crate::QueuedWorkClaimBoundary,
149    claim: &crate::QueuedWorkClaim,
150    causes: Vec<crate::TurnCause>,
151) {
152    send_turn_activity(
153        event_tx,
154        TurnActivityId::fresh(),
155        TurnEvent::QueuedWorkStarted {
156            boundary,
157            batch_ids: queued_work_batch_ids(claim),
158            causes,
159        },
160    )
161    .await;
162}
163
164struct TurnFinishInput {
165    turn_pipeline: TurnBoundary,
166    assembler: TurnAssembler,
167    new_messages: crate::MessageSequence,
168    policy: RuntimeSessionPolicy,
169    turn_index: usize,
170    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
171    turn_input_completions: Vec<crate::TurnInputCompletion>,
172    trace_turn_id: String,
173}
174
175impl LashRuntime {
176    fn max_context_tokens(&self) -> usize {
177        self.state.effective_policy().context_window_tokens()
178    }
179
180    async fn claim_session_execution_lease(
181        &self,
182        cancel: CancellationToken,
183        busy_is_error: bool,
184    ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
185        let Some(store) = self
186            .session
187            .as_ref()
188            .and_then(|session| session.history_store())
189        else {
190            return Ok(None);
191        };
192        match SessionExecutionLeaseGuard::try_acquire(
193            store,
194            &self.state.session_id,
195            &self.runtime_lease_owner,
196            Arc::clone(&self.host.core.clock),
197            cancel,
198        )
199        .await
200        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
201        {
202            Some(lease) => Ok(Some(lease)),
203            None if busy_is_error => Err(RuntimeError::new(
204                RuntimeErrorCode::SessionExecutionBusy,
205                format!(
206                    "session `{}` is already executing on another runtime owner",
207                    self.state.session_id
208                ),
209            )),
210            None => Ok(None),
211        }
212    }
213
214    async fn settle_session_execution_lease<T>(
215        &self,
216        guard: Option<&SessionExecutionLeaseGuard>,
217        result: Result<T, RuntimeError>,
218    ) -> Result<T, RuntimeError> {
219        match result {
220            Ok(value) => {
221                if let Some(guard) = guard {
222                    guard.release_if_live().await.map_err(|err| {
223                        RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
224                    })?;
225                }
226                Ok(value)
227            }
228            Err(err) => {
229                if err.code != RuntimeErrorCode::StoreCommitFailed
230                    && let Some(guard) = guard
231                    && let Err(release_err) = guard.release_if_live().await
232                {
233                    tracing::warn!(
234                        error = %release_err,
235                        "failed to release session execution lease after runtime error"
236                    );
237                }
238                Err(err)
239            }
240        }
241    }
242
243    async fn ensure_session_execution_lease_live(
244        &self,
245        guard: Option<&SessionExecutionLeaseGuard>,
246    ) -> Result<(), RuntimeError> {
247        let Some(guard) = guard else {
248            return Ok(());
249        };
250        guard.refresh_or_mark_lost().await.map_err(|err| {
251            RuntimeError::new(
252                RuntimeErrorCode::SessionExecutionLeaseLost,
253                format!(
254                    "session execution lease for session `{}` was lost before commit: {err}",
255                    self.state.session_id
256                ),
257            )
258        })
259    }
260
261    async fn abandon_queued_work_claims_after_lease_loss(
262        &self,
263        err: &RuntimeError,
264        claims: &[crate::QueuedWorkClaim],
265    ) {
266        if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
267            return;
268        }
269        let Some(store) = self
270            .session
271            .as_ref()
272            .and_then(|session| session.history_store())
273        else {
274            return;
275        };
276        for claim in claims {
277            if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
278                tracing::warn!(
279                    error = %abandon_err,
280                    session_id = %claim.session_id,
281                    claim_id = %claim.claim_id,
282                    "failed to abandon queued work claim after session execution lease loss"
283                );
284            }
285        }
286    }
287
288    async fn abandon_turn_input_claims_after_lease_loss(
289        &self,
290        err: &RuntimeError,
291        claims: &[crate::TurnInputClaim],
292    ) {
293        if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
294            return;
295        }
296        let Some(store) = self
297            .session
298            .as_ref()
299            .and_then(|session| session.history_store())
300        else {
301            return;
302        };
303        for claim in claims {
304            if let Err(abandon_err) = store.abandon_turn_input_claim(claim).await {
305                tracing::warn!(
306                    error = %abandon_err,
307                    session_id = %claim.session_id,
308                    claim_id = %claim.claim_id,
309                    "failed to abandon turn input claim after session execution lease loss"
310                );
311            }
312        }
313    }
314
315    #[doc(hidden)]
316    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
317        self.turn_phase_probe = Some(probe);
318    }
319
320    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
321        if let Some(probe) = self.turn_phase_probe.as_ref() {
322            probe.begin(phase);
323        }
324    }
325
326    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
327        if let Some(probe) = self.turn_phase_probe.as_ref() {
328            probe.end(phase);
329        }
330    }
331
332    async fn finish_turn(
333        &mut self,
334        finish: TurnFinishInput,
335        events: &dyn EventSink,
336        scoped_effect_controller: &ScopedEffectController<'_>,
337        cancel_state: &CancellationToken,
338        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
339        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
340    ) -> Result<AssembledTurn, RuntimeError> {
341        let TurnFinishInput {
342            mut turn_pipeline,
343            assembler,
344            new_messages,
345            policy,
346            turn_index,
347            queued_work_completions,
348            turn_input_completions,
349            trace_turn_id,
350        } = finish;
351        self.policy = self.state.effective_policy().clone();
352        turn_pipeline.state_mut().policy = self.policy.clone();
353        turn_pipeline.state_mut().turn_index = turn_index;
354
355        let mut turn_usage_delta = {
356            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
357            std::mem::take(&mut *ledger)
358        };
359        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
360            turn_usage_delta.push(TokenLedgerEntry {
361                source: "turn".to_string(),
362                model: policy.model.id.clone(),
363                usage: assembler.token_usage.clone(),
364            });
365        }
366        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
367
368        turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
369        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
370            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
371        }
372
373        let last_prompt_usage = assembler
374            .last_llm_usage()
375            .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
376        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
377        let assembled_state = turn_pipeline.export_state_for_assembly();
378        let assembled = assembler.finish(
379            assembled_state,
380            cancel_state.is_cancelled(),
381            None,
382            &self.host.core.control.termination,
383        );
384
385        let Some(session) = self.session.as_ref() else {
386            self.state.apply_snapshot(&assembled.state);
387            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
388            return Ok(assembled);
389        };
390        self.ensure_session_execution_lease_live(session_execution_lease)
391            .await?;
392
393        let plugins = Arc::clone(session.plugins());
394        let manager = match self.runtime_session_services_for_turn(None) {
395            Ok(manager) => manager,
396            Err(err) => {
397                return Err(RuntimeError::new(
398                    RuntimeErrorCode::PluginSessionManager,
399                    err.to_string(),
400                ));
401            }
402        };
403
404        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
405        let finalized = match plugins
406            .finalize_turn_with_phase_probe(
407                assembled,
408                manager.state_service(),
409                manager.lifecycle_service(),
410                manager.graph_service(),
411                self.turn_phase_probe.clone(),
412            )
413            .await
414        {
415            Ok(finalized) => finalized,
416            Err(err) => {
417                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
418                return Err(RuntimeError::new(
419                    RuntimeErrorCode::PluginFinalizeTurn,
420                    err.to_string(),
421                ));
422            }
423        };
424        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
425        self.ensure_session_execution_lease_live(session_execution_lease)
426            .await?;
427
428        let mut returned_turn = finalized.turn;
429        let release_session_execution_lease =
430            session_execution_lease_release_policy.should_release(&returned_turn.outcome);
431        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
432        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
433        let queued_work_completion_trace = queued_work_completions.clone();
434        let pending_attachment_ids = self
435            .host
436            .core
437            .durability
438            .attachment_store
439            .pending_manifest_commit_ids();
440        if let Err(err) = turn_pipeline
441            .final_commit(
442                &mut returned_turn,
443                self.session.as_mut(),
444                &turn_usage_delta,
445                Some(&trace_turn_id),
446                queued_work_completions,
447                turn_input_completions,
448                cancel_state.is_cancelled().then(|| trace_turn_id.clone()),
449                pending_attachment_ids.clone(),
450                release_session_execution_lease
451                    .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
452                    .flatten(),
453            )
454            .await
455        {
456            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
457            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
458            return Err(err);
459        }
460        if release_session_execution_lease && let Some(lease) = session_execution_lease {
461            lease.mark_released();
462        }
463        self.host
464            .core
465            .durability
466            .attachment_store
467            .mark_manifest_committed(&pending_attachment_ids);
468        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
469
470        emit_session_events_to_sink(events, finalized.events).await;
471        self.state = turn_pipeline.into_final_state();
472        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
473            && let Some(session) = self.session.as_mut()
474        {
475            let protocol_session = Arc::clone(session.plugins().protocol_session());
476            let session_id = self.state.session_id.clone();
477            protocol_session
478                .restore_session(
479                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
480                    &self.state,
481                )
482                .await
483                .map_err(|err| {
484                    RuntimeError::new(
485                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
486                        err.to_string(),
487                    )
488                })?;
489        }
490        if !queued_work_completion_trace.is_empty() {
491            crate::trace::emit_trace(
492                &self.host.core.tracing.trace_sink,
493                &self.host.core.tracing.trace_context,
494                lash_trace::TraceContext::default()
495                    .for_session(returned_turn.state.session_id.clone())
496                    .for_turn_index(returned_turn.state.turn_index)
497                    .for_turn(trace_turn_id.clone()),
498                lash_trace::TraceEvent::Custom {
499                    name: "queued_work.completed".to_string(),
500                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
501                },
502                self.host.core.clock.as_ref(),
503            );
504        }
505        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
506        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
507            .await?;
508        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
509        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
510
511        self.emit_completed_turn_trace(
512            &returned_turn.state,
513            &returned_turn.outcome,
514            &trace_turn_id,
515        );
516        Ok(returned_turn)
517    }
518
519    fn emit_completed_turn_trace(
520        &self,
521        state: &SessionSnapshot,
522        outcome: &TurnOutcome,
523        trace_turn_id: &str,
524    ) {
525        if self.host.core.tracing.trace_sink.is_none() {
526            return;
527        }
528
529        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
530        crate::trace::emit_trace(
531            &self.host.core.tracing.trace_sink,
532            &self.host.core.tracing.trace_context,
533            lash_trace::TraceContext::default()
534                .for_session(state.session_id.clone())
535                .for_turn_index(state.turn_index)
536                .for_turn(trace_turn_id.to_string()),
537            lash_trace::TraceEvent::TurnCompleted {
538                status: status.to_string(),
539                done_reason: done_reason.to_string(),
540                agent_frame_switch,
541            },
542            self.host.core.clock.as_ref(),
543        );
544    }
545
546    async fn emit_turn_persisted_event(
547        &self,
548        returned_turn: &AssembledTurn,
549        scoped_effect_controller: &ScopedEffectController<'_>,
550        trace_turn_id: &str,
551    ) -> Result<(), RuntimeError> {
552        let Some(session) = self.session.as_ref() else {
553            return Ok(());
554        };
555        let Ok(manager) = self.runtime_session_services() else {
556            return Ok(());
557        };
558        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
559        let phase_controller = scoped_child_turn_controller(
560            scoped_effect_controller,
561            &self.state.session_id,
562            &phase_turn_id,
563        )?;
564        let direct_completions = manager.direct_completion_client(
565            RuntimeEffectControllerHandle::borrowed(phase_controller),
566            Some(phase_turn_id),
567        );
568
569        session
570            .plugins()
571            .emit_runtime_event_with_phase_probe(
572                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
573                    crate::SessionStateChangedContext {
574                        session_id: self.state.session_id.clone(),
575                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
576                        sessions: manager.state_service(),
577                        session_graph: manager.graph_service(),
578                        direct_completions,
579                    },
580                )),
581                self.turn_phase_probe.clone(),
582            )
583            .await;
584        Ok(())
585    }
586
587    /// Run a single turn and stream events to the host sink.
588    pub async fn stream_turn(
589        &mut self,
590        input: TurnInput,
591        opts: TurnOptions<'_>,
592    ) -> Result<AssembledTurn, RuntimeError> {
593        let cancel = opts.cancel.clone();
594        let session_execution_lease = self
595            .claim_session_execution_lease(cancel.clone(), true)
596            .await?;
597        let scoped_effect_controller = opts.scoped_effect_controller();
598        let result = self
599            .stream_turn_with_scoped_effect_controller_inner(
600                input,
601                opts.events_or_noop(),
602                opts.turn_events_or_noop(),
603                scoped_effect_controller,
604                cancel,
605                None,
606                None,
607                session_execution_lease.as_ref(),
608                SessionExecutionLeaseReleasePolicy::FinalCommit,
609            )
610            .await;
611        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
612            .await
613    }
614
615    pub async fn stream_next_queued_work(
616        &mut self,
617        opts: TurnOptions<'_>,
618    ) -> Result<Option<AssembledTurn>, RuntimeError> {
619        self.stream_queued_work(opts, None).await
620    }
621
622    pub async fn stream_selected_queued_work(
623        &mut self,
624        opts: TurnOptions<'_>,
625        batch_ids: &[String],
626    ) -> Result<Option<AssembledTurn>, RuntimeError> {
627        self.stream_queued_work(opts, Some(batch_ids)).await
628    }
629
630    async fn stream_queued_work(
631        &mut self,
632        opts: TurnOptions<'_>,
633        selected_batch_ids: Option<&[String]>,
634    ) -> Result<Option<AssembledTurn>, RuntimeError> {
635        let cancel = opts.cancel.clone();
636        let Some(session_execution_lease) = self
637            .claim_session_execution_lease(cancel.clone(), false)
638            .await?
639        else {
640            return Ok(None);
641        };
642        let session_execution_fence = session_execution_lease.fence();
643        let Some(store) = self
644            .session
645            .as_ref()
646            .and_then(|session| session.history_store())
647        else {
648            session_execution_lease
649                .release_if_live()
650                .await
651                .map_err(|err| {
652                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
653                })?;
654            return Ok(None);
655        };
656        let drain_commands_before_turn_input = if selected_batch_ids.is_some() {
657            true
658        } else {
659            self.session_commands_precede_pending_turn_input(store.as_ref())
660                .await?
661        };
662        if drain_commands_before_turn_input {
663            loop {
664                match self
665                    .drain_next_session_command(&session_execution_fence)
666                    .await
667                {
668                    Ok(Some(_)) => {}
669                    Ok(None) => break,
670                    Err(err) => {
671                        let _ = session_execution_lease.release_if_live().await;
672                        return Err(err);
673                    }
674                }
675            }
676        }
677        if selected_batch_ids.is_none() {
678            let input_claim = store
679                .claim_next_turn_inputs(
680                    &self.state.session_id,
681                    &session_execution_fence,
682                    &self.runtime_lease_owner,
683                    crate::TURN_INPUT_CLAIM_TTL_MS,
684                    64,
685                )
686                .await
687                .map_err(super::runtime_error_from_store_commit)?;
688            if let Some(input_claim) = input_claim {
689                let mut input = input_claim.materialize_for_turn();
690                let turn_id = input
691                    .trace_turn_id
692                    .clone()
693                    .or_else(|| Some(opts.execution_scope_id().to_owned()))
694                    .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
695                input.trace_turn_id = Some(turn_id.clone());
696                crate::trace::emit_trace(
697                    &self.host.core.tracing.trace_sink,
698                    &self.host.core.tracing.trace_context,
699                    lash_trace::TraceContext::default()
700                        .for_session(self.state.session_id.clone())
701                        .for_turn_index(self.state.turn_index + 1)
702                        .for_turn(turn_id.clone()),
703                    lash_trace::TraceEvent::Custom {
704                        name: "turn_input.claimed".to_string(),
705                        payload: serde_json::json!({
706                            "claim_id": &input_claim.claim_id,
707                            "input_ids": input_claim.inputs.iter().map(|input| input.input_id.clone()).collect::<Vec<_>>(),
708                        }),
709                    },
710                    self.host.core.clock.as_ref(),
711                );
712                let claim_for_abandon = input_claim.clone();
713                let scoped_effect_controller = opts.scoped_effect_controller();
714                let result = self
715                    .stream_turn_with_scoped_effect_controller_inner(
716                        input,
717                        opts.events_or_noop(),
718                        opts.turn_events_or_noop(),
719                        scoped_effect_controller,
720                        cancel,
721                        None,
722                        Some(input_claim),
723                        Some(&session_execution_lease),
724                        SessionExecutionLeaseReleasePolicy::FinalCommit,
725                    )
726                    .await
727                    .map(Some);
728                if let Err(err) = &result {
729                    self.abandon_turn_input_claims_after_lease_loss(
730                        err,
731                        std::slice::from_ref(&claim_for_abandon),
732                    )
733                    .await;
734                }
735                return self
736                    .settle_session_execution_lease(Some(&session_execution_lease), result)
737                    .await;
738            }
739        }
740        let claim = if let Some(batch_ids) = selected_batch_ids {
741            store
742                .claim_ready_queued_work_by_batch_ids(
743                    &self.state.session_id,
744                    &session_execution_fence,
745                    &self.runtime_lease_owner,
746                    crate::QueuedWorkClaimBoundary::Idle,
747                    crate::QUEUED_WORK_CLAIM_TTL_MS,
748                    batch_ids,
749                )
750                .await
751        } else {
752            store
753                .claim_ready_queued_work(
754                    &self.state.session_id,
755                    &session_execution_fence,
756                    &self.runtime_lease_owner,
757                    crate::QueuedWorkClaimBoundary::Idle,
758                    crate::QUEUED_WORK_CLAIM_TTL_MS,
759                    64,
760                )
761                .await
762        }
763        .map_err(super::runtime_error_from_store_commit)?;
764        let Some(claim) = claim else {
765            session_execution_lease
766                .release_if_live()
767                .await
768                .map_err(|err| {
769                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
770                })?;
771            return Ok(None);
772        };
773        let mut work = claim.materialize_for_turn();
774        let turn_id = work
775            .input
776            .trace_turn_id
777            .clone()
778            .or_else(|| Some(opts.execution_scope_id().to_owned()))
779            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
780        work.input.trace_turn_id = Some(turn_id.clone());
781        let causes = work.turn_causes.clone();
782        emit_queued_work_started_to_sink(
783            opts.turn_events_or_noop(),
784            crate::QueuedWorkClaimBoundary::Idle,
785            &claim,
786            causes.clone(),
787        )
788        .await;
789        crate::trace::emit_trace(
790            &self.host.core.tracing.trace_sink,
791            &self.host.core.tracing.trace_context,
792            lash_trace::TraceContext::default()
793                .for_session(self.state.session_id.clone())
794                .for_turn_index(self.state.turn_index + 1)
795                .for_turn(turn_id.clone()),
796            lash_trace::TraceEvent::Custom {
797                name: "queued_work.claimed".to_string(),
798                payload: queued_work_trace_payload(
799                    crate::QueuedWorkClaimBoundary::Idle,
800                    &claim,
801                    &causes,
802                ),
803            },
804            self.host.core.clock.as_ref(),
805        );
806        let claim_for_abandon = claim.clone();
807        let scoped_effect_controller = opts.scoped_effect_controller();
808        let result = self
809            .stream_turn_with_scoped_effect_controller_inner(
810                work.input,
811                opts.events_or_noop(),
812                opts.turn_events_or_noop(),
813                scoped_effect_controller,
814                cancel,
815                Some(claim),
816                None,
817                Some(&session_execution_lease),
818                SessionExecutionLeaseReleasePolicy::FinalCommit,
819            )
820            .await
821            .map(Some);
822        if let Err(err) = &result {
823            self.abandon_queued_work_claims_after_lease_loss(
824                err,
825                std::slice::from_ref(&claim_for_abandon),
826            )
827            .await;
828        }
829        self.settle_session_execution_lease(Some(&session_execution_lease), result)
830            .await
831    }
832
833    async fn session_commands_precede_pending_turn_input(
834        &self,
835        store: &dyn crate::RuntimePersistence,
836    ) -> Result<bool, RuntimeError> {
837        let pending_inputs = store
838            .list_pending_turn_inputs(&self.state.session_id)
839            .await
840            .map_err(super::runtime_error_from_store_commit)?;
841        let earliest_input = pending_inputs
842            .iter()
843            .filter(|input| input.state.is_next_turn_pending())
844            .min_by_key(|input| (input.enqueued_at_ms, input.enqueue_seq));
845        let queued_work = store
846            .list_pending_queued_work(&self.state.session_id)
847            .await
848            .map_err(super::runtime_error_from_store_commit)?;
849        let earliest_command = queued_work
850            .iter()
851            .filter(|batch| batch.is_session_command_work())
852            .min_by_key(|batch| (batch.enqueued_at_ms, batch.enqueue_seq));
853        Ok(match (earliest_command, earliest_input) {
854            (Some(command), Some(input)) => command.enqueued_at_ms < input.enqueued_at_ms,
855            (Some(_), None) => true,
856            _ => false,
857        })
858    }
859
860    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
861    /// the host wired a durable effect host, every store reachable from this
862    /// scope must also be durable. A durable host running against any ephemeral
863    /// store fails loudly here rather than silently degrading.
864    ///
865    /// Inline controllers (the default tier) impose no requirement, so
866    /// inline/in-memory hosts pass unchanged.
867    fn ensure_durable_store_facets_for_scope(
868        &self,
869        scoped_effect_controller: &ScopedEffectController<'_>,
870    ) -> Result<(), RuntimeError> {
871        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
872        {
873            return Ok(());
874        }
875        if self
876            .host
877            .core
878            .durability
879            .attachment_store
880            .persistence()
881            .durability_tier()
882            != crate::DurabilityTier::Durable
883        {
884            return Err(RuntimeError::durable_store_required(
885                crate::DurableStoreFacet::AttachmentStore,
886            ));
887        }
888        if self
889            .host
890            .core
891            .durability
892            .process_env_store
893            .durability_tier()
894            != crate::DurabilityTier::Durable
895        {
896            return Err(RuntimeError::durable_store_required(
897                crate::DurableStoreFacet::ProcessEnvStore,
898            ));
899        }
900        if let Some(store) = self
901            .session
902            .as_ref()
903            .and_then(|session| session.history_store())
904            && store.durability_tier() != crate::DurabilityTier::Durable
905        {
906            return Err(RuntimeError::durable_store_required(
907                crate::DurableStoreFacet::SessionStore,
908            ));
909        }
910        if let Some(process_registry) = self.host.process_registry.as_ref()
911            && process_registry.durability_tier() != crate::DurabilityTier::Durable
912        {
913            return Err(RuntimeError::durable_store_required(
914                crate::DurableStoreFacet::ProcessRegistry,
915            ));
916        }
917        Ok(())
918    }
919
920    #[allow(clippy::too_many_arguments)]
921    async fn stream_turn_with_scoped_effect_controller_inner(
922        &mut self,
923        mut input: TurnInput,
924        events: &dyn EventSink,
925        turn_events: &dyn TurnActivitySink,
926        scoped_effect_controller: ScopedEffectController<'_>,
927        cancel: CancellationToken,
928        queued_claim: Option<crate::QueuedWorkClaim>,
929        turn_input_claim: Option<crate::TurnInputClaim>,
930        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
931        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
932    ) -> Result<AssembledTurn, RuntimeError> {
933        if queued_claim.is_none() && turn_input_claim.is_none() {
934            if let Some(lease) = session_execution_lease {
935                while self
936                    .drain_next_session_command(&lease.fence())
937                    .await?
938                    .is_some()
939                {}
940            } else if self
941                .session
942                .as_ref()
943                .and_then(|session| session.history_store())
944                .is_some()
945            {
946                return Err(RuntimeError::new(
947                    RuntimeErrorCode::StoreCommitFailed,
948                    "session command drain requires a session execution lease",
949                ));
950            }
951        }
952        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
953            && scoped_effect_controller
954                .execution_scope()
955                .validates_turn_trace_id()
956            && input_turn_id != scoped_effect_controller.scope_id()
957        {
958            return Err(RuntimeError::new(
959                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
960                format!(
961                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
962                    scoped_effect_controller.scope_id()
963                ),
964            ));
965        }
966        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
967        input
968            .trace_turn_id
969            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
970        self.stream_turn_inner(
971            input.clone(),
972            events,
973            turn_events,
974            scoped_effect_controller,
975            cancel.clone(),
976            queued_claim,
977            turn_input_claim,
978            session_execution_lease,
979            session_execution_lease_release_policy,
980        )
981        .await
982    }
983
984    /// Stream one logical host turn, following foreground AgentFrame switches
985    /// until a terminal outcome is reached.
986    ///
987    /// A protocol continuation creates a new frame in the same session. Hosts
988    /// that only care about the benchmark/app answer should not need to
989    /// special-case that intermediate outcome; this helper keeps driving the
990    /// same session through each frame's task with the normal runtime turn
991    /// guards.
992    pub async fn stream_turn_with_agent_frames(
993        &mut self,
994        input: TurnInput,
995        opts: TurnOptions<'_>,
996    ) -> Result<AgentFrameRun, RuntimeError> {
997        let cancel = opts.cancel.clone();
998        let session_execution_lease = self
999            .claim_session_execution_lease(cancel.clone(), true)
1000            .await?;
1001        let scoped_effect_controller = opts.scoped_effect_controller();
1002        let result = self
1003            .stream_turn_with_agent_frames_inner(
1004                input,
1005                opts.events_or_noop(),
1006                opts.turn_events_or_noop(),
1007                scoped_effect_controller,
1008                cancel,
1009                session_execution_lease.as_ref(),
1010            )
1011            .await;
1012        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1013            .await
1014    }
1015
1016    async fn stream_turn_with_agent_frames_inner(
1017        &mut self,
1018        mut input: TurnInput,
1019        events: &dyn EventSink,
1020        turn_events: &dyn TurnActivitySink,
1021        scoped_effect_controller: ScopedEffectController<'_>,
1022        cancel: CancellationToken,
1023        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1024    ) -> Result<AgentFrameRun, RuntimeError> {
1025        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
1026            && scoped_effect_controller
1027                .execution_scope()
1028                .validates_turn_trace_id()
1029            && input_turn_id != scoped_effect_controller.scope_id()
1030        {
1031            return Err(RuntimeError::new(
1032                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
1033                format!(
1034                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
1035                    scoped_effect_controller.scope_id()
1036                ),
1037            ));
1038        }
1039        let follow_protocol_turn_options = input.protocol_turn_options.clone();
1040        let follow_turn_context = input.turn_context.clone();
1041        let follow_trace_turn_id = input
1042            .trace_turn_id
1043            .clone()
1044            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
1045        input
1046            .trace_turn_id
1047            .get_or_insert(follow_trace_turn_id.clone());
1048        let mut turns = Vec::new();
1049        loop {
1050            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
1051            input.trace_turn_id = Some(turn_trace_turn_id.clone());
1052            let turn_effect_controller = if turns.is_empty() {
1053                scoped_effect_controller.clone()
1054            } else {
1055                ScopedEffectController::borrowed(
1056                    scoped_effect_controller.controller(),
1057                    ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
1058                )?
1059            };
1060            let turn = self
1061                .stream_turn_with_scoped_effect_controller_inner(
1062                    input,
1063                    events,
1064                    turn_events,
1065                    turn_effect_controller,
1066                    cancel.clone(),
1067                    None,
1068                    None,
1069                    session_execution_lease,
1070                    SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
1071                )
1072                .await?;
1073            let switched_frame = match &turn.outcome {
1074                TurnOutcome::AgentFrameSwitch { frame_id, task } => {
1075                    Some((frame_id.clone(), task.clone()))
1076                }
1077                _ => None,
1078            };
1079            turns.push(turn);
1080
1081            let Some((_frame_id, task)) = switched_frame else {
1082                return Ok(AgentFrameRun { turns });
1083            };
1084            input = turn_input_from_text(task);
1085            input.protocol_turn_options = follow_protocol_turn_options.clone();
1086            input.turn_context = follow_turn_context.clone();
1087        }
1088    }
1089
1090    #[allow(clippy::too_many_arguments)]
1091    async fn stream_turn_inner(
1092        &mut self,
1093        mut input: TurnInput,
1094        events: &dyn EventSink,
1095        turn_events: &dyn TurnActivitySink,
1096        scoped_effect_controller: ScopedEffectController<'_>,
1097        cancel: CancellationToken,
1098        queued_claim: Option<crate::QueuedWorkClaim>,
1099        turn_input_claim: Option<crate::TurnInputClaim>,
1100        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1101        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1102    ) -> Result<AssembledTurn, RuntimeError> {
1103        self.refresh_session_graph_from_store()
1104            .await
1105            .map_err(session_head_refresh_error)?;
1106        let input_trace_turn_id = input.trace_turn_id.clone();
1107        let queued_turn_work = queued_claim
1108            .as_ref()
1109            .map(crate::QueuedWorkClaim::materialize_for_turn);
1110        let pending_turn_input = turn_input_claim
1111            .as_ref()
1112            .map(crate::TurnInputClaim::materialize_for_turn);
1113        if let Some(work) = pending_turn_input.as_ref()
1114            && input.items.is_empty()
1115            && input.image_blobs.is_empty()
1116        {
1117            input = work.clone();
1118            if input.trace_turn_id.is_none() {
1119                input.trace_turn_id = input_trace_turn_id.clone();
1120            }
1121        }
1122        if let Some(work) = queued_turn_work.as_ref()
1123            && input.items.is_empty()
1124            && input.image_blobs.is_empty()
1125        {
1126            input = work.input.clone();
1127            if input.trace_turn_id.is_none() {
1128                input.trace_turn_id = input_trace_turn_id;
1129            }
1130        }
1131        if self
1132            .session
1133            .as_ref()
1134            .and_then(|session| session.history_store())
1135            .is_some()
1136        {
1137            ensure_durable_effect_input(&input)?;
1138        }
1139        if let Some(extension) = &input.protocol_extension
1140            && let Some(session) = self.session.as_ref()
1141        {
1142            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
1143            protocol_session
1144                .validate_turn_extension(extension)
1145                .await
1146                .map_err(|err| {
1147                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
1148                })?;
1149        }
1150        let previous_prompt_usage = self.state.last_prompt_usage.clone();
1151        let normalized = match self
1152            .normalize_input_items(&input.items, &input.image_blobs)
1153            .await
1154        {
1155            Ok(items) => items,
1156            Err(e) => {
1157                self.state.last_prompt_usage = None;
1158                let mut assembler = TurnAssembler::default();
1159                let error_event = SessionEvent::Error {
1160                    message: e.clone(),
1161                    envelope: Some(crate::session_model::ErrorEnvelope {
1162                        kind: "input_validation".to_string(),
1163                        code: Some("invalid_turn_input".to_string()),
1164                        terminal_reason: None,
1165                        user_message: e.clone(),
1166                        raw: None,
1167                    }),
1168                };
1169                assembler.push(&error_event);
1170                emit_turn_activity_to_sink(
1171                    turn_events,
1172                    TurnActivity::independent(TurnEvent::Error { message: e }),
1173                )
1174                .await;
1175                emit_session_event_to_sink(events, error_event).await;
1176                let outcome_event = SessionEvent::TurnOutcome {
1177                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1178                };
1179                assembler.push(&outcome_event);
1180                emit_session_event_to_sink(events, outcome_event).await;
1181                assembler.push(&SessionEvent::Done);
1182                emit_session_event_to_sink(events, SessionEvent::Done).await;
1183                return Ok(assembler.finish(
1184                    self.state.to_snapshot(),
1185                    false,
1186                    None,
1187                    &self.host.core.control.termination,
1188                ));
1189            }
1190        };
1191        let turn_index = self.state.turn_index + 1;
1192        let trace_turn_id = input
1193            .trace_turn_id
1194            .clone()
1195            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1196        if self.host.core.tracing.trace_sink.is_some() {
1197            let mut trace_metadata = std::collections::BTreeMap::new();
1198            trace_metadata.insert(
1199                "input_item_count".to_string(),
1200                serde_json::json!(normalized.len()),
1201            );
1202            crate::trace::emit_trace(
1203                &self.host.core.tracing.trace_sink,
1204                &self.host.core.tracing.trace_context,
1205                lash_trace::TraceContext::default()
1206                    .for_session(self.state.session_id.clone())
1207                    .for_turn_index(turn_index)
1208                    .for_turn(trace_turn_id.clone()),
1209                lash_trace::TraceEvent::TurnStarted {
1210                    metadata: trace_metadata,
1211                },
1212                self.host.core.clock.as_ref(),
1213            );
1214        }
1215
1216        let base_read_model = self.state.read_model();
1217        let base_messages = base_read_model.messages;
1218        let base_render_cache = base_read_model.prompt_render_cache;
1219        let mut turn_delta = Vec::new();
1220        let initial_turn_causes = queued_turn_work
1221            .as_ref()
1222            .map(|work| work.turn_causes.clone())
1223            .unwrap_or_default();
1224        turn_delta.extend(
1225            initial_turn_causes
1226                .iter()
1227                .map(crate::TurnCause::to_event_message),
1228        );
1229
1230        let user_id = fresh_message_id();
1231        let mut user_parts: Vec<Part> = Vec::new();
1232        for item in normalized {
1233            match item {
1234                NormalizedItem::Text(text) => {
1235                    if text.is_empty() {
1236                        continue;
1237                    }
1238                    user_parts.push(Part {
1239                        id: format!("{}.p{}", user_id, user_parts.len()),
1240                        kind: PartKind::Text,
1241                        content: text,
1242                        attachment: None,
1243                        tool_call_id: None,
1244                        tool_name: None,
1245                        tool_replay: None,
1246                        prune_state: PruneState::Intact,
1247                        reasoning_meta: None,
1248                        response_meta: None,
1249                    });
1250                }
1251                NormalizedItem::Image(reference) => {
1252                    user_parts.push(Part {
1253                        id: format!("{}.p{}", user_id, user_parts.len()),
1254                        kind: PartKind::Image,
1255                        content: String::new(),
1256                        attachment: Some(crate::session_model::message::PartAttachment {
1257                            reference,
1258                        }),
1259                        tool_call_id: None,
1260                        tool_name: None,
1261                        tool_replay: None,
1262                        prune_state: PruneState::Intact,
1263                        reasoning_meta: None,
1264                        response_meta: None,
1265                    });
1266                }
1267            }
1268        }
1269        if user_parts.is_empty() && initial_turn_causes.is_empty() {
1270            user_parts.push(Part {
1271                id: format!("{}.p0", user_id),
1272                kind: PartKind::Text,
1273                content: String::new(),
1274                attachment: None,
1275                tool_call_id: None,
1276                tool_name: None,
1277                tool_replay: None,
1278                prune_state: PruneState::Intact,
1279                reasoning_meta: None,
1280                response_meta: None,
1281            });
1282        }
1283        if !user_parts.is_empty() {
1284            reassign_part_ids(&user_id, &mut user_parts);
1285            turn_delta.push(Message {
1286                id: user_id.clone(),
1287                role: MessageRole::User,
1288                parts: shared_parts(user_parts),
1289                origin: None,
1290            });
1291        }
1292
1293        let manager = self
1294            .runtime_session_services_for_turn(None)
1295            .map_err(|err| {
1296                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1297            })?;
1298        let plugin_session = self
1299            .session
1300            .as_ref()
1301            .map(|s| Arc::clone(s.plugins()))
1302            .ok_or_else(|| {
1303                RuntimeError::new(
1304                    RuntimeErrorCode::ContextPrepareTurn,
1305                    "runtime session not available",
1306                )
1307            })?;
1308        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1309        let prepare_phase_controller = scoped_child_turn_controller(
1310            &scoped_effect_controller,
1311            &self.state.session_id,
1312            &prepare_phase_turn_id,
1313        )?;
1314        let turn_ctx = crate::TurnTransformContext {
1315            session_id: self.state.session_id.clone(),
1316            state: self.read_view(),
1317            prompt_usage: previous_prompt_usage.clone(),
1318            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1319            sessions: manager.state_service(),
1320            session_lifecycle: manager.lifecycle_service(),
1321            session_graph: manager.graph_service(),
1322            scoped_effect_controller: scoped_effect_controller.clone(),
1323            direct_completions: manager.direct_completion_client(
1324                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1325                Some(prepare_phase_turn_id),
1326            ),
1327        };
1328        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1329        let prepared_context = plugin_session
1330            .prepare_turn_context(
1331                &turn_ctx,
1332                crate::session_model::context::PreparedContext {
1333                    messages: crate::MessageSequence::from_base_and_delta(
1334                        base_messages,
1335                        turn_delta,
1336                    )
1337                    .with_base_render_cache(base_render_cache),
1338                    ..Default::default()
1339                },
1340                self.turn_phase_probe.clone(),
1341            )
1342            .await
1343            .map_err(|err| {
1344                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1345            })?;
1346        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1347        // Release the read-view's graph clone before the rest of the turn
1348        // runs. Keeping it alive into `stream_prepared_turn` forces the
1349        // post-turn `append_active_read_delta` to deep-clone the session
1350        // graph (Arc::make_mut with refcount > 1).
1351        drop(turn_ctx);
1352        let messages = prepared_context.messages;
1353        if let Some(session) = self.session.as_mut() {
1354            session
1355                .set_context_overlay(
1356                    prepared_context.tool_providers,
1357                    prepared_context.prompt_contributions,
1358                    prepared_context.include_base_tools,
1359                )
1360                .map_err(|err| {
1361                    RuntimeError::new(
1362                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
1363                        err.to_string(),
1364                    )
1365                })?;
1366        }
1367
1368        self.state.last_prompt_usage = None;
1369        Box::pin(self.stream_prepared_turn_inner(
1370            messages,
1371            previous_prompt_usage,
1372            input.protocol_turn_options.clone(),
1373            input.protocol_extension.clone(),
1374            input.turn_context.clone(),
1375            initial_turn_causes,
1376            trace_turn_id,
1377            turn_index,
1378            events,
1379            turn_events,
1380            scoped_effect_controller,
1381            cancel,
1382            queued_claim,
1383            turn_input_claim,
1384            session_execution_lease,
1385            session_execution_lease_release_policy,
1386        ))
1387        .await
1388    }
1389
1390    /// Run a single turn and return only the assembled terminal result.
1391    pub async fn run_turn_assembled(
1392        &mut self,
1393        input: TurnInput,
1394        cancel: CancellationToken,
1395        scoped_effect_controller: ScopedEffectController<'_>,
1396    ) -> Result<AssembledTurn, RuntimeError> {
1397        self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1398            .await
1399    }
1400
1401    /// Run a turn using host-prepared message history.
1402    #[allow(clippy::too_many_arguments)]
1403    pub async fn stream_prepared_turn(
1404        &mut self,
1405        messages: crate::MessageSequence,
1406        previous_prompt_usage: Option<PromptUsage>,
1407        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1408        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1409        turn_context: crate::TurnContext,
1410        initial_turn_causes: Vec<crate::TurnCause>,
1411        trace_turn_id: String,
1412        turn_index: usize,
1413        events: &dyn EventSink,
1414        turn_events: &dyn TurnActivitySink,
1415        scoped_effect_controller: ScopedEffectController<'_>,
1416        cancel: CancellationToken,
1417        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1418        initial_turn_input_claim: Option<crate::TurnInputClaim>,
1419    ) -> Result<AssembledTurn, RuntimeError> {
1420        let session_execution_lease = self
1421            .claim_session_execution_lease(cancel.clone(), true)
1422            .await?;
1423        let result = self
1424            .stream_prepared_turn_inner(
1425                messages,
1426                previous_prompt_usage,
1427                protocol_turn_options,
1428                protocol_extension,
1429                turn_context,
1430                initial_turn_causes,
1431                trace_turn_id,
1432                turn_index,
1433                events,
1434                turn_events,
1435                scoped_effect_controller,
1436                cancel,
1437                initial_queue_claim,
1438                initial_turn_input_claim,
1439                session_execution_lease.as_ref(),
1440                SessionExecutionLeaseReleasePolicy::FinalCommit,
1441            )
1442            .await;
1443        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1444            .await
1445    }
1446
1447    #[allow(clippy::too_many_arguments)]
1448    async fn stream_prepared_turn_inner(
1449        &mut self,
1450        messages: crate::MessageSequence,
1451        _previous_prompt_usage: Option<PromptUsage>,
1452        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1453        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1454        turn_context: crate::TurnContext,
1455        initial_turn_causes: Vec<crate::TurnCause>,
1456        trace_turn_id: String,
1457        turn_index: usize,
1458        events: &dyn EventSink,
1459        turn_events: &dyn TurnActivitySink,
1460        scoped_effect_controller: ScopedEffectController<'_>,
1461        cancel: CancellationToken,
1462        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1463        initial_turn_input_claim: Option<crate::TurnInputClaim>,
1464        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1465        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1466    ) -> Result<AssembledTurn, RuntimeError> {
1467        if session_execution_lease.is_none()
1468            && self
1469                .session
1470                .as_ref()
1471                .and_then(|session| session.history_store())
1472                .is_some()
1473        {
1474            return Err(RuntimeError::new(
1475                RuntimeErrorCode::StoreCommitFailed,
1476                "prepared turn requires a session execution lease",
1477            ));
1478        }
1479        let session_execution_fence =
1480            session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1481        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1482        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1483        let mut turn_policy = self.state.effective_policy().clone();
1484        let turn_provider_override = turn_context.provider().cloned();
1485        if let Some(provider) = turn_provider_override.as_ref() {
1486            turn_policy.provider_id = provider.kind().to_string();
1487        }
1488        if let Some(model) = turn_context.model_spec() {
1489            turn_policy.model = model.clone();
1490        }
1491        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1492        let effective_protocol_turn_options = protocol_turn_options
1493            .clone()
1494            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1495            .unwrap_or(session_protocol_turn_options);
1496        let manager = self
1497            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1498            .map_err(|err| {
1499                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1500            })?;
1501        let plugins = {
1502            let session = self
1503                .session
1504                .as_ref()
1505                .expect("lash runtime session must be available");
1506            Arc::clone(session.plugins())
1507        };
1508        let mut assembler = TurnAssembler::new();
1509        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1510        // Block-scope the pinned future so it (and its captured
1511        // `SessionReadView` clone of the session graph) drops before the
1512        // post-turn `append_active_read_delta` mutation. Keeping it alive
1513        // across the turn forces `Arc::make_mut` to deep-clone
1514        // `SessionGraphData`.
1515        let prepared = {
1516            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1517                PrepareTurnRequest {
1518                    session_id: self.state.session_id.clone(),
1519                    state: crate::SessionReadView::from_runtime_state(
1520                        &self.state,
1521                        turn_policy.clone(),
1522                        effective_protocol_turn_options.clone(),
1523                    ),
1524                    messages,
1525                    sessions: manager.state_service(),
1526                    session_lifecycle: manager.lifecycle_service(),
1527                    session_graph: manager.graph_service(),
1528                    turn_context: turn_context.clone(),
1529                },
1530                self.turn_phase_probe.clone(),
1531            );
1532            let mut prepare_turn = Box::pin(prepare_turn);
1533
1534            loop {
1535                tokio::select! {
1536                    prepared = prepare_turn.as_mut() => {
1537                        let prepared = prepared.map_err(|err| {
1538                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1539                        })?;
1540                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1541                        break prepared;
1542                    }
1543                    maybe_event = event_rx.recv() => {
1544                        if let Some(event) = maybe_event {
1545                            emit_runtime_stream_event_to_sinks(
1546                                events,
1547                                turn_events,
1548                                event,
1549                                &mut assembler,
1550                            )
1551                            .await;
1552                        }
1553                    }
1554                }
1555            }
1556        };
1557        for event in &prepared.events {
1558            assembler.push(event);
1559        }
1560        emit_session_events_to_sink(events, prepared.events).await;
1561        if let Some(abort) = prepared.abort {
1562            drop(event_tx);
1563
1564            let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1565                self.state.clone(),
1566                Arc::clone(&self.host.core.clock),
1567            );
1568            turn_pipeline.apply_prepared_messages(&prepared.messages);
1569            let state = turn_pipeline.into_final_state();
1570            let issue = TurnIssue {
1571                kind: "plugin".to_string(),
1572                code: Some(abort.code),
1573                terminal_reason: None,
1574                message: abort.message.clone(),
1575                raw: None,
1576            };
1577            let error_event = SessionEvent::Error {
1578                message: abort.message,
1579                envelope: Some(crate::session_model::ErrorEnvelope {
1580                    kind: "plugin".to_string(),
1581                    code: issue.code.clone(),
1582                    terminal_reason: None,
1583                    user_message: issue.message.clone(),
1584                    raw: None,
1585                }),
1586            };
1587            assembler.push(&error_event);
1588            emit_turn_activity_to_sink(
1589                turn_events,
1590                TurnActivity::independent(TurnEvent::Error {
1591                    message: issue.message.clone(),
1592                }),
1593            )
1594            .await;
1595            emit_session_event_to_sink(events, error_event).await;
1596            let outcome_event = SessionEvent::TurnOutcome {
1597                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1598            };
1599            assembler.push(&outcome_event);
1600            emit_session_event_to_sink(events, outcome_event).await;
1601            assembler.push(&SessionEvent::Done);
1602            emit_session_event_to_sink(events, SessionEvent::Done).await;
1603            return Ok(assembler.finish(
1604                state.to_snapshot(),
1605                cancel.is_cancelled(),
1606                Some(issue),
1607                &self.host.core.control.termination,
1608            ));
1609        }
1610        let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1611            self.state.clone(),
1612            Arc::clone(&self.host.core.clock),
1613        )
1614        .with_session_execution_lease(session_execution_fence.clone());
1615        let store = self
1616            .session
1617            .as_ref()
1618            .and_then(|session| session.history_store());
1619        // Durable controllers, like Restate, own in-flight replay. Writing
1620        // progress checkpoints directly to the shared store would make handler
1621        // replay observe a newer partial turn and change effect replay keys.
1622        let progress_store = if scoped_effect_controller.controller().durability_tier()
1623            == crate::DurabilityTier::Durable
1624        {
1625            None
1626        } else {
1627            store.as_ref().map(|store| store.as_ref())
1628        };
1629        turn_pipeline
1630            .prepared_checkpoint(
1631                progress_store,
1632                turn_policy.clone(),
1633                turn_index,
1634                &prepared.messages,
1635                self.session.as_mut(),
1636            )
1637            .await
1638            .map_err(super::runtime_error_from_store_commit)?;
1639        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1640            RuntimeSessionPolicy::from_provider(
1641                turn_policy.clone(),
1642                provider.with_clock(Arc::clone(&self.host.core.clock)),
1643            )
1644            .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1645        } else {
1646            self.host
1647                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1648                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1649        };
1650        let manager = self
1651            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1652            .map_err(|err| {
1653                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1654            })?;
1655        let cancel_state = cancel.clone();
1656        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1657        let session = self
1658            .session
1659            .take()
1660            .expect("lash runtime session must be available");
1661        let mut driver = RuntimeTurnDriver {
1662            session,
1663            policy: resolved_turn_policy,
1664            host: self.host.clone(),
1665            turn_id: scoped_effect_controller.scope_id().to_string(),
1666            scoped_effect_controller,
1667            session_id: self.state.session_id.clone(),
1668            turn_index,
1669            turn_pipeline,
1670            llm_stream_summaries: HashMap::new(),
1671            next_llm_ordinal: 0,
1672            session_services: manager,
1673            protocol_turn_options: effective_protocol_turn_options,
1674            protocol_extension,
1675            turn_context,
1676            turn_causes: initial_turn_causes,
1677            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1678            pending_turn_input_claims: initial_turn_input_claim.into_iter().collect(),
1679            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1680            session_execution_lease: session_execution_fence,
1681            runtime_lease_owner: self.runtime_lease_owner.clone(),
1682            turn_phase_probe: self.turn_phase_probe.clone(),
1683        };
1684        let protocol_run_offset = 0;
1685        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1686        let run_result = drive_turn_to_completion(
1687            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1688            &mut event_rx,
1689            &mut assembler,
1690            &child_usage_event_relay,
1691            events,
1692            turn_events,
1693        )
1694        .await;
1695        let (new_messages, _new_protocol_iteration) = match run_result {
1696            Ok(result) => result,
1697            Err(err) => {
1698                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1699                let RuntimeTurnDriver {
1700                    session,
1701                    pending_queue_claims,
1702                    pending_turn_input_claims,
1703                    ..
1704                } = driver;
1705                self.session = Some(session);
1706                self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1707                    .await;
1708                self.abandon_turn_input_claims_after_lease_loss(&err, &pending_turn_input_claims)
1709                    .await;
1710                return Err(err);
1711            }
1712        };
1713        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1714        tracing::debug!(
1715            new_message_count = new_messages.len(),
1716            tool_call_count = assembler.tool_calls.len(),
1717            "runtime post-run_task"
1718        );
1719
1720        let RuntimeTurnDriver {
1721            session,
1722            policy,
1723            turn_pipeline,
1724            pending_queue_claims,
1725            pending_turn_input_claims,
1726            ..
1727        } = driver;
1728        self.session = Some(session);
1729        let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1730        let pending_turn_input_claims_for_abandon = pending_turn_input_claims.clone();
1731        let finish_result = self
1732            .finish_turn(
1733                TurnFinishInput {
1734                    turn_pipeline,
1735                    assembler,
1736                    new_messages,
1737                    policy,
1738                    turn_index,
1739                    queued_work_completions: pending_queue_claims
1740                        .iter()
1741                        .map(crate::QueuedWorkClaim::completion)
1742                        .collect(),
1743                    turn_input_completions: pending_turn_input_claims
1744                        .iter()
1745                        .map(crate::TurnInputClaim::completion)
1746                        .collect(),
1747                    trace_turn_id,
1748                },
1749                events,
1750                &finish_scoped_effect_controller,
1751                &cancel_state,
1752                session_execution_lease,
1753                session_execution_lease_release_policy,
1754            )
1755            .await;
1756        if let Err(err) = &finish_result {
1757            self.abandon_queued_work_claims_after_lease_loss(
1758                err,
1759                &pending_queue_claims_for_abandon,
1760            )
1761            .await;
1762            self.abandon_turn_input_claims_after_lease_loss(
1763                err,
1764                &pending_turn_input_claims_for_abandon,
1765            )
1766            .await;
1767        }
1768        finish_result
1769    }
1770    async fn normalize_input_items(
1771        &self,
1772        items: &[InputItem],
1773        image_blobs: &HashMap<String, Vec<u8>>,
1774    ) -> Result<Vec<NormalizedItem>, String> {
1775        normalize_input_items(
1776            items,
1777            image_blobs,
1778            self.host.core.durability.attachment_store.as_ref(),
1779        )
1780        .await
1781    }
1782}
1783
1784fn turn_input_from_text(text: String) -> TurnInput {
1785    TurnInput {
1786        items: vec![InputItem::Text { text }],
1787        image_blobs: HashMap::new(),
1788        protocol_turn_options: None,
1789        trace_turn_id: None,
1790        protocol_extension: None,
1791        turn_context: crate::TurnContext::default(),
1792    }
1793}
1794
1795fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1796    if completed_turn_count == 0 {
1797        root_turn_id.to_string()
1798    } else {
1799        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1800    }
1801}
1802
1803pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1804    if input.protocol_extension.is_some() {
1805        return Err(RuntimeError::new(
1806            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1807            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1808        ));
1809    }
1810    input
1811        .turn_context
1812        .live_plugin_inputs()
1813        .durable_effect_rejection()?;
1814    Ok(())
1815}
1816
1817async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1818    if !events.is_noop() {
1819        events.emit(activity).await;
1820    }
1821}
1822
1823/// Pump the turn driver's event channel into the host sinks while the run
1824/// future executes, then drain any events emitted between completion and the
1825/// sender dropping.
1826///
1827/// Both the fresh and resumed turn entry points construct a
1828/// `RuntimeTurnDriver`, kick off its run future, and need identical
1829/// event-pump/drain behavior before tearing the driver down. Only the driver
1830/// construction and post-run teardown differ, so each caller owns those and
1831/// shares this loop.
1832async fn drive_turn_to_completion<F>(
1833    run_future: F,
1834    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1835    assembler: &mut TurnAssembler,
1836    child_usage_event_relay: &ChildUsageEventRelay,
1837    events: &dyn EventSink,
1838    turn_events: &dyn TurnActivitySink,
1839) -> Result<(crate::MessageSequence, usize), RuntimeError>
1840where
1841    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1842{
1843    let run_result = {
1844        let mut run_future = Box::pin(run_future);
1845        loop {
1846            tokio::select! {
1847                maybe_event = event_rx.recv() => {
1848                    if let Some(event) = maybe_event {
1849                        emit_runtime_stream_event_to_sinks(
1850                            events,
1851                            turn_events,
1852                            event,
1853                            assembler,
1854                        )
1855                        .await;
1856                    }
1857                }
1858                completed = run_future.as_mut() => {
1859                    child_usage_event_relay.clear();
1860                    break completed;
1861                }
1862            }
1863        }
1864    };
1865    while let Some(event) = event_rx.recv().await {
1866        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1867    }
1868    run_result
1869}
1870
1871async fn emit_runtime_stream_event_to_sinks(
1872    events: &dyn EventSink,
1873    turn_events: &dyn TurnActivitySink,
1874    event: RuntimeStreamEvent,
1875    assembler: &mut TurnAssembler,
1876) {
1877    match event {
1878        RuntimeStreamEvent::Session(event) => {
1879            assembler.push(&event);
1880            emit_session_event_to_sink(events, event).await;
1881        }
1882        RuntimeStreamEvent::Turn(activity) => {
1883            assembler.push_turn_activity(&activity);
1884            emit_turn_activity_to_sink(turn_events, activity).await;
1885        }
1886    }
1887}
1888
1889#[cfg(test)]
1890mod tests {
1891    use super::agent_frame_follow_turn_id;
1892
1893    #[test]
1894    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1895        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1896        assert_eq!(
1897            agent_frame_follow_turn_id("root-turn", 1),
1898            "root-turn:agent-frame:1"
1899        );
1900        assert_eq!(
1901            agent_frame_follow_turn_id("root-turn", 2),
1902            "root-turn:agent-frame:2"
1903        );
1904    }
1905}