1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (&'static str, &'static str, Option<lash_trace::TraceHandoff>) {
6 match outcome {
7 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
8 ("completed", "assistant_message", None)
9 }
10 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
11 ("completed", "submitted_value", None)
12 }
13 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
14 TurnOutcome::Handoff { session_id } => (
15 "completed",
16 "handoff",
17 Some(lash_trace::TraceHandoff {
18 successor_session_id: session_id.clone(),
19 }),
20 ),
21 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
22 }
23}
24
25fn trace_stop_reason(stop: &TurnStop) -> &'static str {
26 match stop {
27 TurnStop::Cancelled => "cancelled",
28 TurnStop::Incomplete => "incomplete",
29 TurnStop::InvalidInput => "invalid_input",
30 TurnStop::MaxTurns => "max_turns",
31 TurnStop::ToolFailure => "tool_failure",
32 TurnStop::ProviderError => "provider_error",
33 TurnStop::PluginAbort => "plugin_abort",
34 TurnStop::RuntimeError => "runtime_error",
35 TurnStop::SubmittedError { .. } => "submitted_error",
36 TurnStop::ToolError { .. } => "tool_error",
37 }
38}
39
40impl LashRuntime {
41 fn max_context_tokens(&self) -> usize {
42 self.policy
43 .max_context_tokens
44 .expect("lash runtime requires explicit max_context_tokens")
45 }
46 #[doc(hidden)]
47 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
48 self.turn_phase_probe = Some(probe);
49 }
50
51 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
52 if let Some(probe) = self.turn_phase_probe.as_ref() {
53 probe.begin(phase);
54 }
55 }
56
57 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
58 if let Some(probe) = self.turn_phase_probe.as_ref() {
59 probe.end(phase);
60 }
61 }
62
63 pub async fn stream_turn(
65 &mut self,
66 input: TurnInput,
67 events: &dyn EventSink,
68 cancel: CancellationToken,
69 ) -> Result<AssembledTurn, RuntimeError> {
70 self.stream_turn_with_semantic_events(input, events, &NoopTurnActivitySink, cancel)
71 .await
72 }
73
74 pub(crate) async fn stream_turn_with_semantic_events(
75 &mut self,
76 input: TurnInput,
77 events: &dyn EventSink,
78 turn_events: &dyn TurnActivitySink,
79 cancel: CancellationToken,
80 ) -> Result<AssembledTurn, RuntimeError> {
81 if let Some(execution_session_id) = self
82 .active_handoff_leaf(&self.state.session_id)
83 .await
84 .filter(|session_id| session_id != &self.state.session_id)
85 {
86 return self
87 .stream_turn_on_handoff_successor(
88 execution_session_id,
89 input,
90 events,
91 turn_events,
92 cancel,
93 )
94 .await;
95 }
96 self.stream_turn_inner(input.clone(), events, turn_events, cancel.clone())
97 .await
98 }
99
100 async fn active_handoff_leaf(&self, session_id: &str) -> Option<String> {
101 let continuations = self.active_handoff_continuations.lock().await;
102 let mut current = session_id.to_string();
103 let mut seen = std::collections::HashSet::new();
104 while seen.insert(current.clone()) {
105 let Some(next) = continuations.get(¤t).cloned() else {
106 return (current != session_id).then_some(current);
107 };
108 current = next;
109 }
110 None
111 }
112
113 async fn stream_turn_on_handoff_successor(
114 &mut self,
115 execution_session_id: String,
116 input: TurnInput,
117 events: &dyn EventSink,
118 turn_events: &dyn TurnActivitySink,
119 cancel: CancellationToken,
120 ) -> Result<AssembledTurn, RuntimeError> {
121 let runtime_handle = {
122 let registry = self.managed_sessions.lock().await;
123 registry.get(&execution_session_id).cloned()
124 }
125 .ok_or_else(|| RuntimeError {
126 code: "handoff_successor_missing".to_string(),
127 message: format!("active handoff session `{execution_session_id}` is unavailable"),
128 })?;
129 let mut runtime = runtime_handle.runtime.lock().await;
130 runtime.state.turn_index = self.state.turn_index;
131 let turn = runtime
132 .stream_turn_inner(input, events, turn_events, cancel)
133 .await?;
134 runtime_handle.publish_from(&runtime);
135 self.state.turn_index = turn.state.turn_index;
136 Ok(turn)
137 }
138
139 pub async fn stream_turn_following_handoffs(
148 &mut self,
149 input: TurnInput,
150 events: &dyn EventSink,
151 cancel: CancellationToken,
152 ) -> Result<FollowedTurn, RuntimeError> {
153 self.stream_turn_following_handoffs_with_semantic_events(
154 input,
155 events,
156 &NoopTurnActivitySink,
157 cancel,
158 )
159 .await
160 }
161
162 pub async fn stream_turn_events_following_handoffs(
163 &mut self,
164 input: TurnInput,
165 turn_events: &dyn TurnActivitySink,
166 cancel: CancellationToken,
167 ) -> Result<FollowedTurn, RuntimeError> {
168 self.stream_turn_following_handoffs_with_semantic_events(
169 input,
170 &NoopEventSink,
171 turn_events,
172 cancel,
173 )
174 .await
175 }
176
177 pub async fn stream_turn_with_events_following_handoffs(
178 &mut self,
179 input: TurnInput,
180 events: &dyn EventSink,
181 turn_events: &dyn TurnActivitySink,
182 cancel: CancellationToken,
183 ) -> Result<FollowedTurn, RuntimeError> {
184 self.stream_turn_following_handoffs_with_semantic_events(input, events, turn_events, cancel)
185 .await
186 }
187
188 async fn stream_turn_following_handoffs_with_semantic_events(
189 &mut self,
190 mut input: TurnInput,
191 events: &dyn EventSink,
192 turn_events: &dyn TurnActivitySink,
193 cancel: CancellationToken,
194 ) -> Result<FollowedTurn, RuntimeError> {
195 let follow_mode_turn_options = input.mode_turn_options.clone();
196 let follow_turn_context = input.turn_context.clone();
197 let follow_trace_turn_id = input
198 .trace_turn_id
199 .clone()
200 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
201 input.trace_turn_id = Some(follow_trace_turn_id.clone());
202 let mut turns = Vec::new();
203 loop {
204 let turn = self
205 .stream_turn_with_semantic_events(input, events, turn_events, cancel.clone())
206 .await?;
207 let successor_session_id = match &turn.outcome {
208 TurnOutcome::Handoff { session_id } => Some(session_id.clone()),
209 _ => None,
210 };
211 turns.push(turn);
212
213 let Some(successor_session_id) = successor_session_id else {
214 return Ok(FollowedTurn { turns });
215 };
216
217 let seed = self
218 .pending_first_turn_inputs
219 .lock()
220 .expect("pending first turn inputs lock")
221 .remove(&successor_session_id)
222 .ok_or_else(|| RuntimeError {
223 code: "handoff_missing_first_turn".to_string(),
224 message: format!(
225 "handoff session `{successor_session_id}` did not provide a first turn"
226 ),
227 })?;
228 input = turn_input_from_plugin_message(seed);
229 input.mode_turn_options = follow_mode_turn_options.clone();
230 input.trace_turn_id = Some(follow_trace_turn_id.clone());
231 input.turn_context = follow_turn_context.clone();
232 if let Some(successor_handle) = {
233 let registry = self.managed_sessions.lock().await;
234 registry.get(&successor_session_id).cloned()
235 } {
236 let mut successor = successor_handle.runtime.lock().await;
237 successor.state.turn_index = self.state.turn_index.saturating_sub(1);
238 successor_handle.publish_from(&successor);
241 }
242 }
243 }
244
245 async fn stream_turn_inner(
246 &mut self,
247 input: TurnInput,
248 events: &dyn EventSink,
249 turn_events: &dyn TurnActivitySink,
250 cancel: CancellationToken,
251 ) -> Result<AssembledTurn, RuntimeError> {
252 self.refresh_session_graph_from_store().await;
253 if let Some(extension) = &input.mode_extension
254 && let Some(session) = self.session.as_ref()
255 {
256 let mode_session = std::sync::Arc::clone(session.plugins().mode_session());
257 mode_session
258 .validate_turn_extension(extension)
259 .await
260 .map_err(|err| RuntimeError {
261 code: "mode_turn_extension".to_string(),
262 message: err.to_string(),
263 })?;
264 }
265 let previous_prompt_usage = self.state.last_prompt_usage.clone();
266 let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
267 Ok(items) => items,
268 Err(e) => {
269 self.state.last_prompt_usage = None;
270 let mut assembler = TurnAssembler::default();
271 let error_event = SessionEvent::Error {
272 message: e.clone(),
273 envelope: Some(crate::session_model::ErrorEnvelope {
274 kind: "input_validation".to_string(),
275 code: Some("invalid_turn_input".to_string()),
276 terminal_reason: None,
277 user_message: e.clone(),
278 raw: None,
279 }),
280 };
281 assembler.push(&error_event);
282 emit_turn_activity_to_sink(
283 turn_events,
284 TurnActivity::independent(TurnEvent::Error { message: e }),
285 )
286 .await;
287 emit_session_event_to_sink(events, error_event).await;
288 let outcome_event = SessionEvent::TurnOutcome {
289 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
290 };
291 assembler.push(&outcome_event);
292 emit_session_event_to_sink(events, outcome_event).await;
293 assembler.push(&SessionEvent::Done);
294 emit_session_event_to_sink(events, SessionEvent::Done).await;
295 return Ok(assembler.finish(
296 self.state.export_state(),
297 false,
298 None,
299 &self.host.core.termination,
300 ));
301 }
302 };
303 let turn_index = self.state.turn_index + 1;
304 let trace_turn_id = input
305 .trace_turn_id
306 .clone()
307 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
308 if self.host.core.trace_sink.is_some() {
309 let mut trace_metadata = std::collections::BTreeMap::new();
310 trace_metadata.insert(
311 "input_item_count".to_string(),
312 serde_json::json!(normalized.len()),
313 );
314 crate::trace::emit_trace(
315 &self.host.core.trace_sink,
316 &self.host.core.trace_context,
317 lash_trace::TraceContext::default()
318 .for_session(self.state.session_id.clone())
319 .for_turn_index(turn_index)
320 .for_turn(trace_turn_id.clone()),
321 lash_trace::TraceEvent::TurnStarted {
322 metadata: trace_metadata,
323 },
324 );
325 }
326
327 let base_read_model = self.state.read_model();
328 let base_messages = base_read_model.messages;
329 let base_render_cache = base_read_model.prompt_render_cache;
330 let mut turn_delta = Vec::new();
331
332 let user_id = fresh_message_id();
333 let mut user_parts: Vec<Part> = Vec::new();
334 for item in normalized {
335 match item {
336 NormalizedItem::Text(text) => {
337 if text.is_empty() {
338 continue;
339 }
340 user_parts.push(Part {
341 id: format!("{}.p{}", user_id, user_parts.len()),
342 kind: PartKind::Text,
343 content: text,
344 attachment: None,
345 tool_call_id: None,
346 tool_name: None,
347 tool_replay: None,
348 prune_state: PruneState::Intact,
349 reasoning_meta: None,
350 response_meta: None,
351 });
352 }
353 NormalizedItem::Image(reference) => {
354 user_parts.push(Part {
355 id: format!("{}.p{}", user_id, user_parts.len()),
356 kind: PartKind::Image,
357 content: String::new(),
358 attachment: Some(crate::session_model::message::PartAttachment {
359 reference,
360 }),
361 tool_call_id: None,
362 tool_name: None,
363 tool_replay: None,
364 prune_state: PruneState::Intact,
365 reasoning_meta: None,
366 response_meta: None,
367 });
368 }
369 }
370 }
371 if user_parts.is_empty() {
372 user_parts.push(Part {
373 id: format!("{}.p0", user_id),
374 kind: PartKind::Text,
375 content: String::new(),
376 attachment: None,
377 tool_call_id: None,
378 tool_name: None,
379 tool_replay: None,
380 prune_state: PruneState::Intact,
381 reasoning_meta: None,
382 response_meta: None,
383 });
384 }
385 reassign_part_ids(&user_id, &mut user_parts);
386 turn_delta.push(Message {
387 id: user_id.clone(),
388 role: MessageRole::User,
389 parts: shared_parts(user_parts),
390 origin: None,
391 });
392
393 let manager = self
394 .runtime_session_manager_for_turn(None)
395 .map_err(|err| RuntimeError {
396 code: "plugin_session_manager".to_string(),
397 message: err.to_string(),
398 })?;
399 let plugin_session = self
400 .session
401 .as_ref()
402 .map(|s| Arc::clone(s.plugins()))
403 .ok_or_else(|| RuntimeError {
404 code: "context_prepare_turn".to_string(),
405 message: "runtime session not available".to_string(),
406 })?;
407 let turn_ctx = crate::TurnTransformContext {
408 session_id: self.state.session_id.clone(),
409 state: self.read_view(),
410 prompt_usage: previous_prompt_usage.clone(),
411 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
412 host: manager.clone(),
413 };
414 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
415 let prepared_context = plugin_session
416 .prepare_turn_context(
417 &turn_ctx,
418 crate::session_model::context::PreparedContext {
419 messages: crate::MessageSequence::from_base_and_delta(
420 base_messages,
421 turn_delta,
422 )
423 .with_base_render_cache(base_render_cache),
424 ..Default::default()
425 },
426 )
427 .await
428 .map_err(|err| RuntimeError {
429 code: "context_prepare_turn".to_string(),
430 message: err.to_string(),
431 })?;
432 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
433 drop(turn_ctx);
438 let messages = prepared_context.messages;
439 if let Some(session) = self.session.as_mut() {
440 session.set_context_surface(
441 prepared_context.tool_providers,
442 prepared_context.prompt_contributions,
443 prepared_context.include_base_tools,
444 );
445 }
446
447 self.state.last_prompt_usage = None;
448
449 self.stream_prepared_turn(
450 messages,
451 previous_prompt_usage,
452 input.mode_turn_options.clone(),
453 input.mode_extension.clone(),
454 input.turn_context.clone(),
455 trace_turn_id,
456 turn_index,
457 events,
458 turn_events,
459 cancel,
460 )
461 .await
462 }
463
464 pub async fn run_turn_assembled(
466 &mut self,
467 input: TurnInput,
468 cancel: CancellationToken,
469 ) -> Result<AssembledTurn, RuntimeError> {
470 self.stream_turn(input, &NoopEventSink, cancel).await
471 }
472
473 #[allow(clippy::too_many_arguments)]
475 pub async fn stream_prepared_turn(
476 &mut self,
477 messages: crate::MessageSequence,
478 _previous_prompt_usage: Option<PromptUsage>,
479 mode_turn_options: Option<crate::ModeTurnOptions>,
480 mode_extension: Option<crate::ModeTurnExtensionHandle>,
481 turn_context: crate::TurnContext,
482 trace_turn_id: String,
483 turn_index: usize,
484 events: &dyn EventSink,
485 turn_events: &dyn TurnActivitySink,
486 cancel: CancellationToken,
487 ) -> Result<AssembledTurn, RuntimeError> {
488 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
489 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
490 let mut turn_policy = self.policy.clone();
491 if let Some(provider) = turn_context.provider().cloned() {
492 let model = provider.default_model().to_string();
493 let model_variant = provider.default_model_variant(&model).map(str::to_string);
494 turn_policy.provider = provider;
495 turn_policy.model = model;
496 turn_policy.model_variant = model_variant;
497 }
498 if let Some((model, variant)) = turn_context.model_selection() {
499 turn_policy.model = model.to_string();
500 turn_policy.model_variant = variant.map(str::to_string);
501 }
502 let effective_mode_turn_options = mode_turn_options
503 .clone()
504 .unwrap_or_else(|| self.mode_turn_options.clone());
505 let manager = self
506 .runtime_session_manager_for_turn(Some(child_usage_event_relay.clone()))
507 .map_err(|err| RuntimeError {
508 code: "plugin_session_manager".to_string(),
509 message: err.to_string(),
510 })?;
511 let plugins = {
512 let session = self
513 .session
514 .as_ref()
515 .expect("lash runtime session must be available");
516 Arc::clone(session.plugins())
517 };
518 let capture_text_deltas =
519 turn_policy.provider.requires_streaming() || plugins.has_assistant_stream_hooks();
520 let mut assembler = TurnAssembler::new(capture_text_deltas);
521 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
522 let prepared = {
528 let prepare_turn = plugins.prepare_turn(PrepareTurnRequest {
529 session_id: self.state.session_id.clone(),
530 state: crate::SessionReadView::from_runtime_state(
531 &self.state,
532 turn_policy.clone(),
533 effective_mode_turn_options.clone(),
534 ),
535 messages,
536 host: manager.clone(),
537 turn_context: turn_context.clone(),
538 });
539 tokio::pin!(prepare_turn);
540
541 loop {
542 tokio::select! {
543 prepared = &mut prepare_turn => {
544 let prepared = prepared.map_err(|err| RuntimeError {
545 code: "plugin_prepare_turn".to_string(),
546 message: err.to_string(),
547 })?;
548 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
549 break prepared;
550 }
551 maybe_event = event_rx.recv() => {
552 if let Some(event) = maybe_event {
553 emit_runtime_stream_event_to_sinks(
554 events,
555 turn_events,
556 event,
557 &mut assembler,
558 )
559 .await;
560 }
561 }
562 }
563 }
564 };
565 for event in &prepared.events {
566 assembler.push(event);
567 }
568 emit_session_events_to_sink(events, prepared.events).await;
569 if let Some(abort) = prepared.abort {
570 drop(event_tx);
571
572 let mut turn_pipeline = TurnCommitPipeline::from_state(self.state.clone());
573 turn_pipeline.apply_prepared_messages(&prepared.messages);
574 let state = turn_pipeline.into_final_state();
575 let issue = TurnIssue {
576 kind: "plugin".to_string(),
577 code: Some(abort.code),
578 terminal_reason: None,
579 message: abort.message.clone(),
580 raw: None,
581 };
582 let error_event = SessionEvent::Error {
583 message: abort.message,
584 envelope: Some(crate::session_model::ErrorEnvelope {
585 kind: "plugin".to_string(),
586 code: issue.code.clone(),
587 terminal_reason: None,
588 user_message: issue.message.clone(),
589 raw: None,
590 }),
591 };
592 assembler.push(&error_event);
593 emit_turn_activity_to_sink(
594 turn_events,
595 TurnActivity::independent(TurnEvent::Error {
596 message: issue.message.clone(),
597 }),
598 )
599 .await;
600 emit_session_event_to_sink(events, error_event).await;
601 let outcome_event = SessionEvent::TurnOutcome {
602 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
603 };
604 assembler.push(&outcome_event);
605 emit_session_event_to_sink(events, outcome_event).await;
606 assembler.push(&SessionEvent::Done);
607 emit_session_event_to_sink(events, SessionEvent::Done).await;
608 return Ok(assembler.finish(
609 state.export_state(),
610 cancel.is_cancelled(),
611 Some(issue),
612 &self.host.core.termination,
613 ));
614 }
615 let mut turn_pipeline = TurnCommitPipeline::from_state(self.state.clone());
616 let store = self
617 .session
618 .as_ref()
619 .and_then(|session| session.history_store());
620 turn_pipeline
621 .prepared_checkpoint(
622 store.as_ref().map(|store| store.as_ref()),
623 turn_policy.clone(),
624 turn_index,
625 &prepared.messages,
626 self.session.as_mut(),
627 )
628 .await
629 .map_err(|err| RuntimeError {
630 code: "store_commit_failed".to_string(),
631 message: err.to_string(),
632 })?;
633 let cancel_state = cancel.clone();
634 let session = self
635 .session
636 .take()
637 .expect("lash runtime session must be available");
638 let mut driver = RuntimeTurnDriver {
639 session,
640 policy: turn_policy.clone(),
641 host: self.host.clone(),
642 session_id: self.state.session_id.clone(),
643 turn_id: trace_turn_id.clone(),
644 turn_index,
645 turn_pipeline,
646 llm_stream_summaries: HashMap::new(),
647 next_llm_ordinal: 0,
648 session_manager: manager,
649 mode_turn_options: effective_mode_turn_options,
650 mode_extension,
651 turn_context,
652 turn_phase_probe: self.turn_phase_probe.clone(),
653 };
654 let mode_run_offset = 0;
655 let run_task = tokio::spawn(async move {
656 let (new_messages, new_mode_iteration) = driver
657 .run(prepared.messages, event_tx, cancel, mode_run_offset)
658 .await;
659 (driver, new_messages, new_mode_iteration)
660 });
661 tokio::pin!(run_task);
662
663 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
664 let (driver, new_messages, _new_mode_iteration) = loop {
665 tokio::select! {
666 maybe_event = event_rx.recv() => {
667 if let Some(event) = maybe_event {
668 emit_runtime_stream_event_to_sinks(
669 events,
670 turn_events,
671 event,
672 &mut assembler,
673 )
674 .await;
675 }
676 }
677 joined = &mut run_task => {
678 child_usage_event_relay.clear();
679 let joined = match joined {
680 Ok(v) => v,
681 Err(e) => {
682 let issue = TurnIssue {
683 kind: "runtime".to_string(),
684 code: Some("run_task_join_failed".to_string()),
685 terminal_reason: None,
686 message: format!("Runtime turn task failed: {e}"),
687 raw: None,
688 };
689 return Ok(assembler.finish(
690 self.state.export_state(),
691 cancel_state.is_cancelled(),
692 Some(issue),
693 &self.host.core.termination,
694 ));
695 }
696 };
697 break joined;
698 }
699 }
700 };
701 while let Some(event) = event_rx.recv().await {
702 emit_runtime_stream_event_to_sinks(events, turn_events, event, &mut assembler).await;
703 }
704 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
705 tracing::debug!(
706 rss_kb = debug_rss_kb(),
707 new_message_count = new_messages.len(),
708 tool_call_count = assembler.tool_calls.len(),
709 "runtime post-run_task"
710 );
711
712 let child_ledger = {
717 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
718 std::mem::take(&mut *ledger)
719 };
720
721 let RuntimeTurnDriver {
722 session,
723 policy,
724 mut turn_pipeline,
725 ..
726 } = driver;
727 self.session = Some(session);
728 self.policy = self.state.policy.clone();
729 turn_pipeline.state_mut().policy = self.policy.clone();
730 turn_pipeline.state_mut().turn_index = turn_index;
731 let mut turn_usage_delta = child_ledger.clone();
732 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
733 let entry = TokenLedgerEntry {
734 source: "turn".to_string(),
735 model: self.policy.model.clone(),
736 usage: assembler.token_usage.clone(),
737 };
738 turn_usage_delta.push(entry);
739 }
740 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
741 turn_pipeline.finalize_turn_read_state(
742 new_messages,
743 &assembler.tool_calls,
744 cancel_state.is_cancelled(),
745 );
746 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
747 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
748 }
749
750 let last_prompt_usage = assembler
751 .last_llm_usage()
752 .and_then(|usage| normalize_prompt_usage(&policy.provider, usage));
753 let finalize_manager = if self.session.is_some() {
754 Some(
755 self.runtime_session_manager_for_turn(None)
756 .map_err(|err| RuntimeError {
757 code: "plugin_session_manager".to_string(),
758 message: err.to_string(),
759 })?,
760 )
761 } else {
762 None
763 };
764 tracing::debug!(
765 rss_kb = debug_rss_kb(),
766 state_message_count = turn_pipeline.state_mut().read_model().messages.len(),
767 graph_node_count = turn_pipeline.state_mut().session_graph.nodes.len(),
768 token_ledger_entries = turn_pipeline.state_mut().token_ledger.len(),
769 "runtime before assembler.finish"
770 );
771 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage.clone();
772 let assembled_state = turn_pipeline.export_state_for_assembly();
773 let assembled = assembler.finish(
774 assembled_state,
775 cancel_state.is_cancelled(),
776 None,
777 &self.host.core.termination,
778 );
779 tracing::debug!(
780 rss_kb = debug_rss_kb(),
781 assembled_message_count = assembled.state.read_model().messages.len(),
782 assembled_graph_node_count = assembled.state.session_graph.nodes.len(),
783 "runtime after assembler.finish"
784 );
785 if let Some(session) = self.session.as_ref() {
786 let plugins = Arc::clone(session.plugins());
787 let manager = finalize_manager.expect("finalize manager should exist with session");
788 tracing::debug!(rss_kb = debug_rss_kb(), "runtime before finalize_turn");
789 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
790 let finalized = plugins
791 .finalize_turn(assembled, manager)
792 .await
793 .map_err(|err| RuntimeError {
794 code: "plugin_finalize_turn".to_string(),
795 message: err.to_string(),
796 })?;
797 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
798 tracing::debug!(
799 rss_kb = debug_rss_kb(),
800 finalized_message_count = finalized.turn.state.read_model().messages.len(),
801 "runtime after finalize_turn"
802 );
803 let mut returned_turn = finalized.turn;
804 tracing::debug!(
805 rss_kb = debug_rss_kb(),
806 tool_state_present = turn_pipeline.state_mut().tool_state_ref.is_some()
807 || turn_pipeline.state_mut().tool_state_snapshot.is_some(),
808 plugin_snapshot_present = turn_pipeline.state_mut().plugin_snapshot_ref.is_some()
809 || turn_pipeline.state_mut().plugin_snapshot.is_some(),
810 "runtime before stamp_runtime_state"
811 );
812 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
813 turn_pipeline
814 .final_commit(&mut returned_turn, self.session.as_mut(), &turn_usage_delta)
815 .await?;
816 tracing::debug!(
817 rss_kb = debug_rss_kb(),
818 resident_graph_node_count = returned_turn.state.session_graph.nodes.len(),
819 persisted_message_count = returned_turn.state.read_model().messages.len(),
820 "runtime after stamp_runtime_state"
821 );
822 emit_session_events_to_sink(events, finalized.events).await;
823 self.state = turn_pipeline.into_final_state();
824 if let Some(session) = self.session.as_ref()
825 && let Ok(host) = self.runtime_session_manager()
826 {
827 session
828 .plugins()
829 .emit_runtime_event(crate::PluginRuntimeEvent::TurnPersisted(
830 crate::SessionStateChangedContext {
831 session_id: self.state.session_id.clone(),
832 state: crate::SessionReadView::from_exported_state(
833 &returned_turn.state,
834 ),
835 host,
836 },
837 ))
838 .await;
839 }
840 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
841 if self.host.core.trace_sink.is_some() {
842 let (status, done_reason, handoff) =
843 trace_fields_from_outcome(&returned_turn.outcome);
844 crate::trace::emit_trace(
845 &self.host.core.trace_sink,
846 &self.host.core.trace_context,
847 lash_trace::TraceContext::default()
848 .for_session(returned_turn.state.session_id.clone())
849 .for_turn_index(returned_turn.state.turn_index)
850 .for_turn(trace_turn_id.clone()),
851 lash_trace::TraceEvent::TurnCompleted {
852 status: status.to_string(),
853 done_reason: done_reason.to_string(),
854 handoff,
855 },
856 );
857 }
858 Ok(returned_turn)
859 } else {
860 self.state.apply_exported_state(&assembled.state);
861 if self.host.core.trace_sink.is_some() {
862 let (status, done_reason, handoff) = trace_fields_from_outcome(&assembled.outcome);
863 crate::trace::emit_trace(
864 &self.host.core.trace_sink,
865 &self.host.core.trace_context,
866 lash_trace::TraceContext::default()
867 .for_session(assembled.state.session_id.clone())
868 .for_turn_index(assembled.state.turn_index)
869 .for_turn(trace_turn_id),
870 lash_trace::TraceEvent::TurnCompleted {
871 status: status.to_string(),
872 done_reason: done_reason.to_string(),
873 handoff,
874 },
875 );
876 }
877 Ok(assembled)
878 }
879 }
880 fn normalize_input_items(
881 &self,
882 items: &[InputItem],
883 image_blobs: &HashMap<String, Vec<u8>>,
884 ) -> Result<Vec<NormalizedItem>, String> {
885 normalize_input_items(items, image_blobs, self.host.core.attachment_store.as_ref())
886 }
887}
888
889fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
890 let mut items = Vec::new();
891 if !message.content.is_empty() {
892 items.push(InputItem::Text {
893 text: message.content,
894 });
895 }
896 let mut image_blobs = HashMap::new();
897 for (index, bytes) in message.images.into_iter().enumerate() {
898 let id = format!("handoff-seed-image-{index}");
899 image_blobs.insert(id.clone(), bytes);
900 items.push(InputItem::ImageRef { id });
901 }
902 TurnInput {
903 items,
904 image_blobs,
905 mode_turn_options: None,
906 trace_turn_id: None,
907 mode_extension: None,
908 turn_context: crate::TurnContext::default(),
909 }
910}
911
912async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
913 if !events.is_noop() {
914 events.emit(activity).await;
915 }
916}
917
918async fn emit_runtime_stream_event_to_sinks(
919 events: &dyn EventSink,
920 turn_events: &dyn TurnActivitySink,
921 event: RuntimeStreamEvent,
922 assembler: &mut TurnAssembler,
923) {
924 match event {
925 RuntimeStreamEvent::Session(event) => {
926 assembler.push(&event);
927 emit_session_event_to_sink(events, event).await;
928 }
929 RuntimeStreamEvent::Turn(activity) => {
930 assembler.push_turn_activity(&activity);
931 emit_turn_activity_to_sink(turn_events, activity).await;
932 }
933 }
934}