Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (&'static str, &'static str, Option<lash_trace::TraceHandoff>) {
6    match outcome {
7        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
8            ("completed", "assistant_message", None)
9        }
10        TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
11            ("completed", "submitted_value", None)
12        }
13        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
14        TurnOutcome::Handoff { session_id } => (
15            "completed",
16            "handoff",
17            Some(lash_trace::TraceHandoff {
18                successor_session_id: session_id.clone(),
19            }),
20        ),
21        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
22    }
23}
24
25fn trace_stop_reason(stop: &TurnStop) -> &'static str {
26    match stop {
27        TurnStop::Cancelled => "cancelled",
28        TurnStop::Incomplete => "incomplete",
29        TurnStop::InvalidInput => "invalid_input",
30        TurnStop::MaxTurns => "max_turns",
31        TurnStop::ToolFailure => "tool_failure",
32        TurnStop::ProviderError => "provider_error",
33        TurnStop::PluginAbort => "plugin_abort",
34        TurnStop::RuntimeError => "runtime_error",
35        TurnStop::SubmittedError { .. } => "submitted_error",
36        TurnStop::ToolError { .. } => "tool_error",
37    }
38}
39
40impl LashRuntime {
41    fn max_context_tokens(&self) -> usize {
42        self.policy
43            .max_context_tokens
44            .expect("lash runtime requires explicit max_context_tokens")
45    }
46    #[doc(hidden)]
47    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
48        self.turn_phase_probe = Some(probe);
49    }
50
51    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
52        if let Some(probe) = self.turn_phase_probe.as_ref() {
53            probe.begin(phase);
54        }
55    }
56
57    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
58        if let Some(probe) = self.turn_phase_probe.as_ref() {
59            probe.end(phase);
60        }
61    }
62
63    /// Run a single turn and stream events to the host sink.
64    pub async fn stream_turn(
65        &mut self,
66        input: TurnInput,
67        events: &dyn EventSink,
68        cancel: CancellationToken,
69    ) -> Result<AssembledTurn, RuntimeError> {
70        self.stream_turn_with_semantic_events(input, events, &NoopTurnActivitySink, cancel)
71            .await
72    }
73
74    pub(crate) async fn stream_turn_with_semantic_events(
75        &mut self,
76        input: TurnInput,
77        events: &dyn EventSink,
78        turn_events: &dyn TurnActivitySink,
79        cancel: CancellationToken,
80    ) -> Result<AssembledTurn, RuntimeError> {
81        if let Some(execution_session_id) = self
82            .active_handoff_leaf(&self.state.session_id)
83            .await
84            .filter(|session_id| session_id != &self.state.session_id)
85        {
86            return self
87                .stream_turn_on_handoff_successor(
88                    execution_session_id,
89                    input,
90                    events,
91                    turn_events,
92                    cancel,
93                )
94                .await;
95        }
96        self.stream_turn_inner(input.clone(), events, turn_events, cancel.clone())
97            .await
98    }
99
100    async fn active_handoff_leaf(&self, session_id: &str) -> Option<String> {
101        let continuations = self.active_handoff_continuations.lock().await;
102        let mut current = session_id.to_string();
103        let mut seen = std::collections::HashSet::new();
104        while seen.insert(current.clone()) {
105            let Some(next) = continuations.get(&current).cloned() else {
106                return (current != session_id).then_some(current);
107            };
108            current = next;
109        }
110        None
111    }
112
113    async fn stream_turn_on_handoff_successor(
114        &mut self,
115        execution_session_id: String,
116        input: TurnInput,
117        events: &dyn EventSink,
118        turn_events: &dyn TurnActivitySink,
119        cancel: CancellationToken,
120    ) -> Result<AssembledTurn, RuntimeError> {
121        let runtime_handle = {
122            let registry = self.managed_sessions.lock().await;
123            registry.get(&execution_session_id).cloned()
124        }
125        .ok_or_else(|| RuntimeError {
126            code: "handoff_successor_missing".to_string(),
127            message: format!("active handoff session `{execution_session_id}` is unavailable"),
128        })?;
129        let mut runtime = runtime_handle.runtime.lock().await;
130        runtime.state.turn_index = self.state.turn_index;
131        let turn = runtime
132            .stream_turn_inner(input, events, turn_events, cancel)
133            .await?;
134        runtime_handle.publish_from(&runtime);
135        self.state.turn_index = turn.state.turn_index;
136        Ok(turn)
137    }
138
139    /// Stream one logical host turn, following foreground handoffs until a
140    /// non-handoff outcome is reached.
141    ///
142    /// RLM `continue_as` creates a successor session with queued first-turn
143    /// input. Hosts that only care about the benchmark/app answer should not
144    /// need to special-case that intermediate outcome; this helper activates
145    /// each successor and drives its queued first turn with the normal runtime
146    /// turn guards.
147    pub async fn stream_turn_following_handoffs(
148        &mut self,
149        input: TurnInput,
150        events: &dyn EventSink,
151        cancel: CancellationToken,
152    ) -> Result<FollowedTurn, RuntimeError> {
153        self.stream_turn_following_handoffs_with_semantic_events(
154            input,
155            events,
156            &NoopTurnActivitySink,
157            cancel,
158        )
159        .await
160    }
161
162    pub async fn stream_turn_events_following_handoffs(
163        &mut self,
164        input: TurnInput,
165        turn_events: &dyn TurnActivitySink,
166        cancel: CancellationToken,
167    ) -> Result<FollowedTurn, RuntimeError> {
168        self.stream_turn_following_handoffs_with_semantic_events(
169            input,
170            &NoopEventSink,
171            turn_events,
172            cancel,
173        )
174        .await
175    }
176
177    pub async fn stream_turn_with_events_following_handoffs(
178        &mut self,
179        input: TurnInput,
180        events: &dyn EventSink,
181        turn_events: &dyn TurnActivitySink,
182        cancel: CancellationToken,
183    ) -> Result<FollowedTurn, RuntimeError> {
184        self.stream_turn_following_handoffs_with_semantic_events(input, events, turn_events, cancel)
185            .await
186    }
187
188    async fn stream_turn_following_handoffs_with_semantic_events(
189        &mut self,
190        mut input: TurnInput,
191        events: &dyn EventSink,
192        turn_events: &dyn TurnActivitySink,
193        cancel: CancellationToken,
194    ) -> Result<FollowedTurn, RuntimeError> {
195        let follow_mode_turn_options = input.mode_turn_options.clone();
196        let follow_turn_context = input.turn_context.clone();
197        let follow_trace_turn_id = input
198            .trace_turn_id
199            .clone()
200            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
201        input.trace_turn_id = Some(follow_trace_turn_id.clone());
202        let mut turns = Vec::new();
203        loop {
204            let turn = self
205                .stream_turn_with_semantic_events(input, events, turn_events, cancel.clone())
206                .await?;
207            let successor_session_id = match &turn.outcome {
208                TurnOutcome::Handoff { session_id } => Some(session_id.clone()),
209                _ => None,
210            };
211            turns.push(turn);
212
213            let Some(successor_session_id) = successor_session_id else {
214                return Ok(FollowedTurn { turns });
215            };
216
217            let seed = self
218                .pending_first_turn_inputs
219                .lock()
220                .expect("pending first turn inputs lock")
221                .remove(&successor_session_id)
222                .ok_or_else(|| RuntimeError {
223                    code: "handoff_missing_first_turn".to_string(),
224                    message: format!(
225                        "handoff session `{successor_session_id}` did not provide a first turn"
226                    ),
227                })?;
228            input = turn_input_from_plugin_message(seed);
229            input.mode_turn_options = follow_mode_turn_options.clone();
230            input.trace_turn_id = Some(follow_trace_turn_id.clone());
231            input.turn_context = follow_turn_context.clone();
232            if let Some(successor_handle) = {
233                let registry = self.managed_sessions.lock().await;
234                registry.get(&successor_session_id).cloned()
235            } {
236                let mut successor = successor_handle.runtime.lock().await;
237                successor.state.turn_index = self.state.turn_index.saturating_sub(1);
238                // Keep observers aligned if a handoff successor has not
239                // started its own streamed turn yet.
240                successor_handle.publish_from(&successor);
241            }
242        }
243    }
244
245    async fn stream_turn_inner(
246        &mut self,
247        input: TurnInput,
248        events: &dyn EventSink,
249        turn_events: &dyn TurnActivitySink,
250        cancel: CancellationToken,
251    ) -> Result<AssembledTurn, RuntimeError> {
252        self.refresh_session_graph_from_store().await;
253        if let Some(extension) = &input.mode_extension
254            && let Some(session) = self.session.as_ref()
255        {
256            let mode_session = std::sync::Arc::clone(session.plugins().mode_session());
257            mode_session
258                .validate_turn_extension(extension)
259                .await
260                .map_err(|err| RuntimeError {
261                    code: "mode_turn_extension".to_string(),
262                    message: err.to_string(),
263                })?;
264        }
265        let previous_prompt_usage = self.state.last_prompt_usage.clone();
266        let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
267            Ok(items) => items,
268            Err(e) => {
269                self.state.last_prompt_usage = None;
270                let mut assembler = TurnAssembler::default();
271                let error_event = SessionEvent::Error {
272                    message: e.clone(),
273                    envelope: Some(crate::session_model::ErrorEnvelope {
274                        kind: "input_validation".to_string(),
275                        code: Some("invalid_turn_input".to_string()),
276                        terminal_reason: None,
277                        user_message: e.clone(),
278                        raw: None,
279                    }),
280                };
281                assembler.push(&error_event);
282                emit_turn_activity_to_sink(
283                    turn_events,
284                    TurnActivity::independent(TurnEvent::Error { message: e }),
285                )
286                .await;
287                emit_session_event_to_sink(events, error_event).await;
288                let outcome_event = SessionEvent::TurnOutcome {
289                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
290                };
291                assembler.push(&outcome_event);
292                emit_session_event_to_sink(events, outcome_event).await;
293                assembler.push(&SessionEvent::Done);
294                emit_session_event_to_sink(events, SessionEvent::Done).await;
295                return Ok(assembler.finish(
296                    self.state.export_state(),
297                    false,
298                    None,
299                    &self.host.core.termination,
300                ));
301            }
302        };
303        let turn_index = self.state.turn_index + 1;
304        let trace_turn_id = input
305            .trace_turn_id
306            .clone()
307            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
308        if self.host.core.trace_sink.is_some() {
309            let mut trace_metadata = std::collections::BTreeMap::new();
310            trace_metadata.insert(
311                "input_item_count".to_string(),
312                serde_json::json!(normalized.len()),
313            );
314            crate::trace::emit_trace(
315                &self.host.core.trace_sink,
316                &self.host.core.trace_context,
317                lash_trace::TraceContext::default()
318                    .for_session(self.state.session_id.clone())
319                    .for_turn_index(turn_index)
320                    .for_turn(trace_turn_id.clone()),
321                lash_trace::TraceEvent::TurnStarted {
322                    metadata: trace_metadata,
323                },
324            );
325        }
326
327        let base_read_model = self.state.read_model();
328        let base_messages = base_read_model.messages;
329        let base_render_cache = base_read_model.prompt_render_cache;
330        let mut turn_delta = Vec::new();
331
332        let user_id = fresh_message_id();
333        let mut user_parts: Vec<Part> = Vec::new();
334        for item in normalized {
335            match item {
336                NormalizedItem::Text(text) => {
337                    if text.is_empty() {
338                        continue;
339                    }
340                    user_parts.push(Part {
341                        id: format!("{}.p{}", user_id, user_parts.len()),
342                        kind: PartKind::Text,
343                        content: text,
344                        attachment: None,
345                        tool_call_id: None,
346                        tool_name: None,
347                        tool_replay: None,
348                        prune_state: PruneState::Intact,
349                        reasoning_meta: None,
350                        response_meta: None,
351                    });
352                }
353                NormalizedItem::Image(reference) => {
354                    user_parts.push(Part {
355                        id: format!("{}.p{}", user_id, user_parts.len()),
356                        kind: PartKind::Image,
357                        content: String::new(),
358                        attachment: Some(crate::session_model::message::PartAttachment {
359                            reference,
360                        }),
361                        tool_call_id: None,
362                        tool_name: None,
363                        tool_replay: None,
364                        prune_state: PruneState::Intact,
365                        reasoning_meta: None,
366                        response_meta: None,
367                    });
368                }
369            }
370        }
371        if user_parts.is_empty() {
372            user_parts.push(Part {
373                id: format!("{}.p0", user_id),
374                kind: PartKind::Text,
375                content: String::new(),
376                attachment: None,
377                tool_call_id: None,
378                tool_name: None,
379                tool_replay: None,
380                prune_state: PruneState::Intact,
381                reasoning_meta: None,
382                response_meta: None,
383            });
384        }
385        reassign_part_ids(&user_id, &mut user_parts);
386        turn_delta.push(Message {
387            id: user_id.clone(),
388            role: MessageRole::User,
389            parts: shared_parts(user_parts),
390            origin: None,
391        });
392
393        let manager = self
394            .runtime_session_manager_for_turn(None)
395            .map_err(|err| RuntimeError {
396                code: "plugin_session_manager".to_string(),
397                message: err.to_string(),
398            })?;
399        let plugin_session = self
400            .session
401            .as_ref()
402            .map(|s| Arc::clone(s.plugins()))
403            .ok_or_else(|| RuntimeError {
404                code: "context_prepare_turn".to_string(),
405                message: "runtime session not available".to_string(),
406            })?;
407        let turn_ctx = crate::TurnTransformContext {
408            session_id: self.state.session_id.clone(),
409            state: self.read_view(),
410            prompt_usage: previous_prompt_usage.clone(),
411            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
412            host: manager.clone(),
413        };
414        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
415        let prepared_context = plugin_session
416            .prepare_turn_context(
417                &turn_ctx,
418                crate::session_model::context::PreparedContext {
419                    messages: crate::MessageSequence::from_base_and_delta(
420                        base_messages,
421                        turn_delta,
422                    )
423                    .with_base_render_cache(base_render_cache),
424                    ..Default::default()
425                },
426            )
427            .await
428            .map_err(|err| RuntimeError {
429                code: "context_prepare_turn".to_string(),
430                message: err.to_string(),
431            })?;
432        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
433        // Release the read-view's graph clone before the rest of the turn
434        // runs. Keeping it alive into `stream_prepared_turn` forces the
435        // post-turn `append_active_read_delta` to deep-clone the session
436        // graph (Arc::make_mut with refcount > 1).
437        drop(turn_ctx);
438        let messages = prepared_context.messages;
439        if let Some(session) = self.session.as_mut() {
440            session.set_context_surface(
441                prepared_context.tool_providers,
442                prepared_context.prompt_contributions,
443                prepared_context.include_base_tools,
444            );
445        }
446
447        self.state.last_prompt_usage = None;
448
449        self.stream_prepared_turn(
450            messages,
451            previous_prompt_usage,
452            input.mode_turn_options.clone(),
453            input.mode_extension.clone(),
454            input.turn_context.clone(),
455            trace_turn_id,
456            turn_index,
457            events,
458            turn_events,
459            cancel,
460        )
461        .await
462    }
463
464    /// Run a single turn and return only the assembled terminal result.
465    pub async fn run_turn_assembled(
466        &mut self,
467        input: TurnInput,
468        cancel: CancellationToken,
469    ) -> Result<AssembledTurn, RuntimeError> {
470        self.stream_turn(input, &NoopEventSink, cancel).await
471    }
472
473    /// Run a turn using host-prepared message history.
474    #[allow(clippy::too_many_arguments)]
475    pub async fn stream_prepared_turn(
476        &mut self,
477        messages: crate::MessageSequence,
478        _previous_prompt_usage: Option<PromptUsage>,
479        mode_turn_options: Option<crate::ModeTurnOptions>,
480        mode_extension: Option<crate::ModeTurnExtensionHandle>,
481        turn_context: crate::TurnContext,
482        trace_turn_id: String,
483        turn_index: usize,
484        events: &dyn EventSink,
485        turn_events: &dyn TurnActivitySink,
486        cancel: CancellationToken,
487    ) -> Result<AssembledTurn, RuntimeError> {
488        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
489        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
490        let mut turn_policy = self.policy.clone();
491        if let Some(provider) = turn_context.provider().cloned() {
492            let model = provider.default_model().to_string();
493            let model_variant = provider.default_model_variant(&model).map(str::to_string);
494            turn_policy.provider = provider;
495            turn_policy.model = model;
496            turn_policy.model_variant = model_variant;
497        }
498        if let Some((model, variant)) = turn_context.model_selection() {
499            turn_policy.model = model.to_string();
500            turn_policy.model_variant = variant.map(str::to_string);
501        }
502        let effective_mode_turn_options = mode_turn_options
503            .clone()
504            .unwrap_or_else(|| self.mode_turn_options.clone());
505        let manager = self
506            .runtime_session_manager_for_turn(Some(child_usage_event_relay.clone()))
507            .map_err(|err| RuntimeError {
508                code: "plugin_session_manager".to_string(),
509                message: err.to_string(),
510            })?;
511        let plugins = {
512            let session = self
513                .session
514                .as_ref()
515                .expect("lash runtime session must be available");
516            Arc::clone(session.plugins())
517        };
518        let capture_text_deltas =
519            turn_policy.provider.requires_streaming() || plugins.has_assistant_stream_hooks();
520        let mut assembler = TurnAssembler::new(capture_text_deltas);
521        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
522        // Block-scope the pinned future so it (and its captured
523        // `SessionReadView` clone of the session graph) drops before the
524        // post-turn `append_active_read_delta` mutation. Keeping it alive
525        // across the turn forces `Arc::make_mut` to deep-clone
526        // `SessionGraphData`.
527        let prepared = {
528            let prepare_turn = plugins.prepare_turn(PrepareTurnRequest {
529                session_id: self.state.session_id.clone(),
530                state: crate::SessionReadView::from_runtime_state(
531                    &self.state,
532                    turn_policy.clone(),
533                    effective_mode_turn_options.clone(),
534                ),
535                messages,
536                host: manager.clone(),
537                turn_context: turn_context.clone(),
538            });
539            tokio::pin!(prepare_turn);
540
541            loop {
542                tokio::select! {
543                    prepared = &mut prepare_turn => {
544                        let prepared = prepared.map_err(|err| RuntimeError {
545                            code: "plugin_prepare_turn".to_string(),
546                            message: err.to_string(),
547                        })?;
548                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
549                        break prepared;
550                    }
551                    maybe_event = event_rx.recv() => {
552                        if let Some(event) = maybe_event {
553                            emit_runtime_stream_event_to_sinks(
554                                events,
555                                turn_events,
556                                event,
557                                &mut assembler,
558                            )
559                            .await;
560                        }
561                    }
562                }
563            }
564        };
565        for event in &prepared.events {
566            assembler.push(event);
567        }
568        emit_session_events_to_sink(events, prepared.events).await;
569        if let Some(abort) = prepared.abort {
570            drop(event_tx);
571
572            let mut turn_pipeline = TurnCommitPipeline::from_state(self.state.clone());
573            turn_pipeline.apply_prepared_messages(&prepared.messages);
574            let state = turn_pipeline.into_final_state();
575            let issue = TurnIssue {
576                kind: "plugin".to_string(),
577                code: Some(abort.code),
578                terminal_reason: None,
579                message: abort.message.clone(),
580                raw: None,
581            };
582            let error_event = SessionEvent::Error {
583                message: abort.message,
584                envelope: Some(crate::session_model::ErrorEnvelope {
585                    kind: "plugin".to_string(),
586                    code: issue.code.clone(),
587                    terminal_reason: None,
588                    user_message: issue.message.clone(),
589                    raw: None,
590                }),
591            };
592            assembler.push(&error_event);
593            emit_turn_activity_to_sink(
594                turn_events,
595                TurnActivity::independent(TurnEvent::Error {
596                    message: issue.message.clone(),
597                }),
598            )
599            .await;
600            emit_session_event_to_sink(events, error_event).await;
601            let outcome_event = SessionEvent::TurnOutcome {
602                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
603            };
604            assembler.push(&outcome_event);
605            emit_session_event_to_sink(events, outcome_event).await;
606            assembler.push(&SessionEvent::Done);
607            emit_session_event_to_sink(events, SessionEvent::Done).await;
608            return Ok(assembler.finish(
609                state.export_state(),
610                cancel.is_cancelled(),
611                Some(issue),
612                &self.host.core.termination,
613            ));
614        }
615        let mut turn_pipeline = TurnCommitPipeline::from_state(self.state.clone());
616        let store = self
617            .session
618            .as_ref()
619            .and_then(|session| session.history_store());
620        turn_pipeline
621            .prepared_checkpoint(
622                store.as_ref().map(|store| store.as_ref()),
623                turn_policy.clone(),
624                turn_index,
625                &prepared.messages,
626                self.session.as_mut(),
627            )
628            .await
629            .map_err(|err| RuntimeError {
630                code: "store_commit_failed".to_string(),
631                message: err.to_string(),
632            })?;
633        let cancel_state = cancel.clone();
634        let session = self
635            .session
636            .take()
637            .expect("lash runtime session must be available");
638        let mut driver = RuntimeTurnDriver {
639            session,
640            policy: turn_policy.clone(),
641            host: self.host.clone(),
642            session_id: self.state.session_id.clone(),
643            turn_id: trace_turn_id.clone(),
644            turn_index,
645            turn_pipeline,
646            llm_stream_summaries: HashMap::new(),
647            next_llm_ordinal: 0,
648            session_manager: manager,
649            mode_turn_options: effective_mode_turn_options,
650            mode_extension,
651            turn_context,
652            turn_phase_probe: self.turn_phase_probe.clone(),
653        };
654        let mode_run_offset = 0;
655        let run_task = tokio::spawn(async move {
656            let (new_messages, new_mode_iteration) = driver
657                .run(prepared.messages, event_tx, cancel, mode_run_offset)
658                .await;
659            (driver, new_messages, new_mode_iteration)
660        });
661        tokio::pin!(run_task);
662
663        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
664        let (driver, new_messages, _new_mode_iteration) = loop {
665            tokio::select! {
666                maybe_event = event_rx.recv() => {
667                    if let Some(event) = maybe_event {
668                        emit_runtime_stream_event_to_sinks(
669                            events,
670                            turn_events,
671                            event,
672                            &mut assembler,
673                        )
674                        .await;
675                    }
676                }
677                joined = &mut run_task => {
678                    child_usage_event_relay.clear();
679                    let joined = match joined {
680                        Ok(v) => v,
681                        Err(e) => {
682                            let issue = TurnIssue {
683                                kind: "runtime".to_string(),
684                                code: Some("run_task_join_failed".to_string()),
685                                terminal_reason: None,
686                                message: format!("Runtime turn task failed: {e}"),
687                                raw: None,
688                            };
689                            return Ok(assembler.finish(
690                                self.state.export_state(),
691                                cancel_state.is_cancelled(),
692                                Some(issue),
693                                &self.host.core.termination,
694                            ));
695                        }
696                    };
697                    break joined;
698                }
699            }
700        };
701        while let Some(event) = event_rx.recv().await {
702            emit_runtime_stream_event_to_sinks(events, turn_events, event, &mut assembler).await;
703        }
704        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
705        tracing::debug!(
706            rss_kb = debug_rss_kb(),
707            new_message_count = new_messages.len(),
708            tool_call_count = assembler.tool_calls.len(),
709            "runtime post-run_task"
710        );
711
712        // Drain the shared token ledger (child sessions + direct
713        // completions + async OM observers/reflectors). Merge it after
714        // restoring the latest progress checkpoint state so in-turn
715        // progress commits cannot wipe live child usage.
716        let child_ledger = {
717            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
718            std::mem::take(&mut *ledger)
719        };
720
721        let RuntimeTurnDriver {
722            session,
723            policy,
724            mut turn_pipeline,
725            ..
726        } = driver;
727        self.session = Some(session);
728        self.policy = self.state.policy.clone();
729        turn_pipeline.state_mut().policy = self.policy.clone();
730        turn_pipeline.state_mut().turn_index = turn_index;
731        let mut turn_usage_delta = child_ledger.clone();
732        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
733            let entry = TokenLedgerEntry {
734                source: "turn".to_string(),
735                model: self.policy.model.clone(),
736                usage: assembler.token_usage.clone(),
737            };
738            turn_usage_delta.push(entry);
739        }
740        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
741        turn_pipeline.finalize_turn_read_state(
742            new_messages,
743            &assembler.tool_calls,
744            cancel_state.is_cancelled(),
745        );
746        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
747            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
748        }
749
750        let last_prompt_usage = assembler
751            .last_llm_usage()
752            .and_then(|usage| normalize_prompt_usage(&policy.provider, usage));
753        let finalize_manager = if self.session.is_some() {
754            Some(
755                self.runtime_session_manager_for_turn(None)
756                    .map_err(|err| RuntimeError {
757                        code: "plugin_session_manager".to_string(),
758                        message: err.to_string(),
759                    })?,
760            )
761        } else {
762            None
763        };
764        tracing::debug!(
765            rss_kb = debug_rss_kb(),
766            state_message_count = turn_pipeline.state_mut().read_model().messages.len(),
767            graph_node_count = turn_pipeline.state_mut().session_graph.nodes.len(),
768            token_ledger_entries = turn_pipeline.state_mut().token_ledger.len(),
769            "runtime before assembler.finish"
770        );
771        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage.clone();
772        let assembled_state = turn_pipeline.export_state_for_assembly();
773        let assembled = assembler.finish(
774            assembled_state,
775            cancel_state.is_cancelled(),
776            None,
777            &self.host.core.termination,
778        );
779        tracing::debug!(
780            rss_kb = debug_rss_kb(),
781            assembled_message_count = assembled.state.read_model().messages.len(),
782            assembled_graph_node_count = assembled.state.session_graph.nodes.len(),
783            "runtime after assembler.finish"
784        );
785        if let Some(session) = self.session.as_ref() {
786            let plugins = Arc::clone(session.plugins());
787            let manager = finalize_manager.expect("finalize manager should exist with session");
788            tracing::debug!(rss_kb = debug_rss_kb(), "runtime before finalize_turn");
789            self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
790            let finalized = plugins
791                .finalize_turn(assembled, manager)
792                .await
793                .map_err(|err| RuntimeError {
794                    code: "plugin_finalize_turn".to_string(),
795                    message: err.to_string(),
796                })?;
797            self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
798            tracing::debug!(
799                rss_kb = debug_rss_kb(),
800                finalized_message_count = finalized.turn.state.read_model().messages.len(),
801                "runtime after finalize_turn"
802            );
803            let mut returned_turn = finalized.turn;
804            tracing::debug!(
805                rss_kb = debug_rss_kb(),
806                tool_state_present = turn_pipeline.state_mut().tool_state_ref.is_some()
807                    || turn_pipeline.state_mut().tool_state_snapshot.is_some(),
808                plugin_snapshot_present = turn_pipeline.state_mut().plugin_snapshot_ref.is_some()
809                    || turn_pipeline.state_mut().plugin_snapshot.is_some(),
810                "runtime before stamp_runtime_state"
811            );
812            self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
813            turn_pipeline
814                .final_commit(&mut returned_turn, self.session.as_mut(), &turn_usage_delta)
815                .await?;
816            tracing::debug!(
817                rss_kb = debug_rss_kb(),
818                resident_graph_node_count = returned_turn.state.session_graph.nodes.len(),
819                persisted_message_count = returned_turn.state.read_model().messages.len(),
820                "runtime after stamp_runtime_state"
821            );
822            emit_session_events_to_sink(events, finalized.events).await;
823            self.state = turn_pipeline.into_final_state();
824            if let Some(session) = self.session.as_ref()
825                && let Ok(host) = self.runtime_session_manager()
826            {
827                session
828                    .plugins()
829                    .emit_runtime_event(crate::PluginRuntimeEvent::TurnPersisted(
830                        crate::SessionStateChangedContext {
831                            session_id: self.state.session_id.clone(),
832                            state: crate::SessionReadView::from_exported_state(
833                                &returned_turn.state,
834                            ),
835                            host,
836                        },
837                    ))
838                    .await;
839            }
840            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
841            if self.host.core.trace_sink.is_some() {
842                let (status, done_reason, handoff) =
843                    trace_fields_from_outcome(&returned_turn.outcome);
844                crate::trace::emit_trace(
845                    &self.host.core.trace_sink,
846                    &self.host.core.trace_context,
847                    lash_trace::TraceContext::default()
848                        .for_session(returned_turn.state.session_id.clone())
849                        .for_turn_index(returned_turn.state.turn_index)
850                        .for_turn(trace_turn_id.clone()),
851                    lash_trace::TraceEvent::TurnCompleted {
852                        status: status.to_string(),
853                        done_reason: done_reason.to_string(),
854                        handoff,
855                    },
856                );
857            }
858            Ok(returned_turn)
859        } else {
860            self.state.apply_exported_state(&assembled.state);
861            if self.host.core.trace_sink.is_some() {
862                let (status, done_reason, handoff) = trace_fields_from_outcome(&assembled.outcome);
863                crate::trace::emit_trace(
864                    &self.host.core.trace_sink,
865                    &self.host.core.trace_context,
866                    lash_trace::TraceContext::default()
867                        .for_session(assembled.state.session_id.clone())
868                        .for_turn_index(assembled.state.turn_index)
869                        .for_turn(trace_turn_id),
870                    lash_trace::TraceEvent::TurnCompleted {
871                        status: status.to_string(),
872                        done_reason: done_reason.to_string(),
873                        handoff,
874                    },
875                );
876            }
877            Ok(assembled)
878        }
879    }
880    fn normalize_input_items(
881        &self,
882        items: &[InputItem],
883        image_blobs: &HashMap<String, Vec<u8>>,
884    ) -> Result<Vec<NormalizedItem>, String> {
885        normalize_input_items(items, image_blobs, self.host.core.attachment_store.as_ref())
886    }
887}
888
889fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
890    let mut items = Vec::new();
891    if !message.content.is_empty() {
892        items.push(InputItem::Text {
893            text: message.content,
894        });
895    }
896    let mut image_blobs = HashMap::new();
897    for (index, bytes) in message.images.into_iter().enumerate() {
898        let id = format!("handoff-seed-image-{index}");
899        image_blobs.insert(id.clone(), bytes);
900        items.push(InputItem::ImageRef { id });
901    }
902    TurnInput {
903        items,
904        image_blobs,
905        mode_turn_options: None,
906        trace_turn_id: None,
907        mode_extension: None,
908        turn_context: crate::TurnContext::default(),
909    }
910}
911
912async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
913    if !events.is_noop() {
914        events.emit(activity).await;
915    }
916}
917
918async fn emit_runtime_stream_event_to_sinks(
919    events: &dyn EventSink,
920    turn_events: &dyn TurnActivitySink,
921    event: RuntimeStreamEvent,
922    assembler: &mut TurnAssembler,
923) {
924    match event {
925        RuntimeStreamEvent::Session(event) => {
926            assembler.push(&event);
927            emit_session_event_to_sink(events, event).await;
928        }
929        RuntimeStreamEvent::Turn(activity) => {
930            assembler.push_turn_activity(&activity);
931            emit_turn_activity_to_sink(turn_events, activity).await;
932        }
933    }
934}