1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52 match payload {
53 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
56 }
57}
58
59fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
60 claim
61 .batches
62 .iter()
63 .map(|batch| batch.batch_id.clone())
64 .collect()
65}
66
67fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
68 format!("{parent_turn_id}:{phase}")
69}
70
71fn scoped_child_turn_controller<'run>(
72 scoped_effect_controller: &'run ScopedEffectController<'_>,
73 session_id: &str,
74 turn_id: &str,
75) -> Result<ScopedEffectController<'run>, RuntimeError> {
76 ScopedEffectController::borrowed(
77 scoped_effect_controller.controller(),
78 ExecutionScope::turn(session_id, turn_id),
79 )
80}
81
82pub(in crate::runtime) fn queued_work_trace_payload(
83 boundary: crate::QueuedWorkClaimBoundary,
84 claim: &crate::QueuedWorkClaim,
85 causes: &[crate::TurnCause],
86) -> serde_json::Value {
87 serde_json::json!({
88 "boundary": boundary,
89 "claim_id": claim.claim_id,
90 "owner_id": claim.owner_id,
91 "batch_ids": queued_work_batch_ids(claim),
92 "payload_types": claim.batches.iter()
93 .flat_map(|batch| batch.items.iter())
94 .map(|item| queued_work_payload_type(&item.payload))
95 .collect::<Vec<_>>(),
96 "causes": causes,
97 })
98}
99
100pub(in crate::runtime) fn queued_work_completion_trace_payload(
101 completions: &[crate::QueuedWorkCompletion],
102) -> serde_json::Value {
103 serde_json::json!({
104 "claims": completions.iter().map(|completion| {
105 serde_json::json!({
106 "session_id": completion.session_id,
107 "claim_id": completion.claim_id,
108 "batch_ids": completion.batch_ids,
109 })
110 }).collect::<Vec<_>>(),
111 })
112}
113
114async fn emit_queued_work_started_to_sink(
115 events: &dyn TurnActivitySink,
116 boundary: crate::QueuedWorkClaimBoundary,
117 claim: &crate::QueuedWorkClaim,
118 causes: Vec<crate::TurnCause>,
119) {
120 emit_turn_activity_to_sink(
121 events,
122 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
123 boundary,
124 batch_ids: queued_work_batch_ids(claim),
125 causes,
126 }),
127 )
128 .await;
129}
130
131pub(in crate::runtime) async fn send_queued_work_started_event(
132 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
133 boundary: crate::QueuedWorkClaimBoundary,
134 claim: &crate::QueuedWorkClaim,
135 causes: Vec<crate::TurnCause>,
136) {
137 send_turn_activity(
138 event_tx,
139 TurnActivityId::fresh(),
140 TurnEvent::QueuedWorkStarted {
141 boundary,
142 batch_ids: queued_work_batch_ids(claim),
143 causes,
144 },
145 )
146 .await;
147}
148
149struct TurnFinishInput {
150 turn_pipeline: TurnBoundary,
151 assembler: TurnAssembler,
152 new_messages: crate::MessageSequence,
153 policy: RuntimeSessionPolicy,
154 turn_index: usize,
155 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
156 trace_turn_id: String,
157}
158
159impl LashRuntime {
160 fn max_context_tokens(&self) -> usize {
161 self.state.effective_policy().context_window_tokens()
162 }
163 #[doc(hidden)]
164 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
165 self.turn_phase_probe = Some(probe);
166 }
167
168 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
169 if let Some(probe) = self.turn_phase_probe.as_ref() {
170 probe.begin(phase);
171 }
172 }
173
174 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
175 if let Some(probe) = self.turn_phase_probe.as_ref() {
176 probe.end(phase);
177 }
178 }
179
180 async fn finish_turn(
181 &mut self,
182 finish: TurnFinishInput,
183 events: &dyn EventSink,
184 scoped_effect_controller: &ScopedEffectController<'_>,
185 cancel_state: &CancellationToken,
186 ) -> Result<AssembledTurn, RuntimeError> {
187 let TurnFinishInput {
188 mut turn_pipeline,
189 assembler,
190 new_messages,
191 policy,
192 turn_index,
193 queued_work_completions,
194 trace_turn_id,
195 } = finish;
196 self.policy = self.state.effective_policy().clone();
197 turn_pipeline.state_mut().policy = self.policy.clone();
198 turn_pipeline.state_mut().turn_index = turn_index;
199
200 let mut turn_usage_delta = {
201 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
202 std::mem::take(&mut *ledger)
203 };
204 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
205 turn_usage_delta.push(TokenLedgerEntry {
206 source: "turn".to_string(),
207 model: policy.model.id.clone(),
208 usage: assembler.token_usage.clone(),
209 });
210 }
211 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
212
213 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
214 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
215 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
216 }
217
218 let last_prompt_usage = assembler
219 .last_llm_usage()
220 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
221 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
222 let assembled_state = turn_pipeline.export_state_for_assembly();
223 let assembled = assembler.finish(
224 assembled_state,
225 cancel_state.is_cancelled(),
226 None,
227 &self.host.core.control.termination,
228 );
229
230 let Some(session) = self.session.as_ref() else {
231 self.state.apply_snapshot(&assembled.state);
232 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
233 return Ok(assembled);
234 };
235
236 let plugins = Arc::clone(session.plugins());
237 let manager = match self.runtime_session_services_for_turn(None) {
238 Ok(manager) => manager,
239 Err(err) => {
240 return Err(RuntimeError::new(
241 RuntimeErrorCode::PluginSessionManager,
242 err.to_string(),
243 ));
244 }
245 };
246
247 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
248 let finalized = match plugins
249 .finalize_turn_with_phase_probe(
250 assembled,
251 manager.state_service(),
252 manager.lifecycle_service(),
253 manager.graph_service(),
254 self.turn_phase_probe.clone(),
255 )
256 .await
257 {
258 Ok(finalized) => finalized,
259 Err(err) => {
260 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
261 return Err(RuntimeError::new(
262 RuntimeErrorCode::PluginFinalizeTurn,
263 err.to_string(),
264 ));
265 }
266 };
267 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
268
269 let mut returned_turn = finalized.turn;
270 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
271 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
272 let queued_work_completion_trace = queued_work_completions.clone();
273 let pending_attachment_ids = self
274 .host
275 .core
276 .durability
277 .attachment_store
278 .pending_manifest_commit_ids();
279 if let Err(err) = turn_pipeline
280 .final_commit(
281 &mut returned_turn,
282 self.session.as_mut(),
283 &turn_usage_delta,
284 Some(&trace_turn_id),
285 queued_work_completions,
286 pending_attachment_ids.clone(),
287 )
288 .await
289 {
290 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
291 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
292 return Err(err);
293 }
294 self.host
295 .core
296 .durability
297 .attachment_store
298 .mark_manifest_committed(&pending_attachment_ids);
299 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
300
301 emit_session_events_to_sink(events, finalized.events).await;
302 self.state = turn_pipeline.into_final_state();
303 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
304 && let Some(session) = self.session.as_mut()
305 {
306 let protocol_session = Arc::clone(session.plugins().protocol_session());
307 let session_id = self.state.session_id.clone();
308 protocol_session
309 .restore_session(
310 crate::plugin::ProtocolSessionContext::new(session, &session_id),
311 &self.state,
312 )
313 .await
314 .map_err(|err| {
315 RuntimeError::new(
316 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
317 err.to_string(),
318 )
319 })?;
320 }
321 if !queued_work_completion_trace.is_empty() {
322 crate::trace::emit_trace(
323 &self.host.core.tracing.trace_sink,
324 &self.host.core.tracing.trace_context,
325 lash_trace::TraceContext::default()
326 .for_session(returned_turn.state.session_id.clone())
327 .for_turn_index(returned_turn.state.turn_index)
328 .for_turn(trace_turn_id.clone()),
329 lash_trace::TraceEvent::Custom {
330 name: "queued_work.completed".to_string(),
331 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
332 },
333 self.host.core.clock.as_ref(),
334 );
335 }
336 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
337 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
338 .await?;
339 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
340 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
341
342 self.emit_completed_turn_trace(
343 &returned_turn.state,
344 &returned_turn.outcome,
345 &trace_turn_id,
346 );
347 Ok(returned_turn)
348 }
349
350 fn emit_completed_turn_trace(
351 &self,
352 state: &SessionSnapshot,
353 outcome: &TurnOutcome,
354 trace_turn_id: &str,
355 ) {
356 if self.host.core.tracing.trace_sink.is_none() {
357 return;
358 }
359
360 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
361 crate::trace::emit_trace(
362 &self.host.core.tracing.trace_sink,
363 &self.host.core.tracing.trace_context,
364 lash_trace::TraceContext::default()
365 .for_session(state.session_id.clone())
366 .for_turn_index(state.turn_index)
367 .for_turn(trace_turn_id.to_string()),
368 lash_trace::TraceEvent::TurnCompleted {
369 status: status.to_string(),
370 done_reason: done_reason.to_string(),
371 agent_frame_switch,
372 },
373 self.host.core.clock.as_ref(),
374 );
375 }
376
377 async fn emit_turn_persisted_event(
378 &self,
379 returned_turn: &AssembledTurn,
380 scoped_effect_controller: &ScopedEffectController<'_>,
381 trace_turn_id: &str,
382 ) -> Result<(), RuntimeError> {
383 let Some(session) = self.session.as_ref() else {
384 return Ok(());
385 };
386 let Ok(manager) = self.runtime_session_services() else {
387 return Ok(());
388 };
389 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
390 let phase_controller = scoped_child_turn_controller(
391 scoped_effect_controller,
392 &self.state.session_id,
393 &phase_turn_id,
394 )?;
395 let direct_completions = manager.direct_completion_client(
396 RuntimeEffectControllerHandle::borrowed(phase_controller),
397 Some(phase_turn_id),
398 );
399
400 session
401 .plugins()
402 .emit_runtime_event_with_phase_probe(
403 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
404 crate::SessionStateChangedContext {
405 session_id: self.state.session_id.clone(),
406 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
407 sessions: manager.state_service(),
408 session_graph: manager.graph_service(),
409 direct_completions,
410 },
411 )),
412 self.turn_phase_probe.clone(),
413 )
414 .await;
415 Ok(())
416 }
417
418 pub async fn stream_turn(
420 &mut self,
421 input: TurnInput,
422 opts: TurnOptions<'_>,
423 ) -> Result<AssembledTurn, RuntimeError> {
424 let cancel = opts.cancel.clone();
425 let scoped_effect_controller = opts.scoped_effect_controller();
426 self.stream_turn_with_scoped_effect_controller_inner(
427 input,
428 opts.events_or_noop(),
429 opts.turn_events_or_noop(),
430 scoped_effect_controller,
431 cancel,
432 None,
433 )
434 .await
435 }
436
437 pub async fn stream_next_queued_work(
438 &mut self,
439 opts: TurnOptions<'_>,
440 ) -> Result<Option<AssembledTurn>, RuntimeError> {
441 self.stream_queued_work(opts, None).await
442 }
443
444 pub async fn stream_selected_queued_work(
445 &mut self,
446 opts: TurnOptions<'_>,
447 batch_ids: &[String],
448 ) -> Result<Option<AssembledTurn>, RuntimeError> {
449 self.stream_queued_work(opts, Some(batch_ids)).await
450 }
451
452 async fn stream_queued_work(
453 &mut self,
454 opts: TurnOptions<'_>,
455 selected_batch_ids: Option<&[String]>,
456 ) -> Result<Option<AssembledTurn>, RuntimeError> {
457 if self.drain_next_session_command().await?.is_some() {
458 return Ok(None);
459 }
460 let Some(store) = self
461 .session
462 .as_ref()
463 .and_then(|session| session.history_store())
464 else {
465 return Ok(None);
466 };
467 let claim = if let Some(batch_ids) = selected_batch_ids {
468 store
469 .claim_ready_queued_work_by_batch_ids(
470 &self.state.session_id,
471 &self.runtime_scope_id,
472 crate::QueuedWorkClaimBoundary::Idle,
473 crate::QUEUED_WORK_CLAIM_TTL_MS,
474 batch_ids,
475 )
476 .await
477 } else {
478 store
479 .claim_ready_queued_work(
480 &self.state.session_id,
481 &self.runtime_scope_id,
482 crate::QueuedWorkClaimBoundary::Idle,
483 crate::QUEUED_WORK_CLAIM_TTL_MS,
484 64,
485 )
486 .await
487 }
488 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
489 let Some(claim) = claim else {
490 return Ok(None);
491 };
492 let mut work = claim.materialize_for_turn();
493 let turn_id = work
494 .input
495 .trace_turn_id
496 .clone()
497 .or_else(|| Some(opts.execution_scope_id().to_owned()))
498 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
499 work.input.trace_turn_id = Some(turn_id.clone());
500 let causes = work.turn_causes.clone();
501 emit_queued_work_started_to_sink(
502 opts.turn_events_or_noop(),
503 crate::QueuedWorkClaimBoundary::Idle,
504 &claim,
505 causes.clone(),
506 )
507 .await;
508 crate::trace::emit_trace(
509 &self.host.core.tracing.trace_sink,
510 &self.host.core.tracing.trace_context,
511 lash_trace::TraceContext::default()
512 .for_session(self.state.session_id.clone())
513 .for_turn_index(self.state.turn_index + 1)
514 .for_turn(turn_id.clone()),
515 lash_trace::TraceEvent::Custom {
516 name: "queued_work.claimed".to_string(),
517 payload: queued_work_trace_payload(
518 crate::QueuedWorkClaimBoundary::Idle,
519 &claim,
520 &causes,
521 ),
522 },
523 self.host.core.clock.as_ref(),
524 );
525 let cancel = opts.cancel.clone();
526 let scoped_effect_controller = opts.scoped_effect_controller();
527 self.stream_turn_with_scoped_effect_controller_inner(
528 work.input,
529 opts.events_or_noop(),
530 opts.turn_events_or_noop(),
531 scoped_effect_controller,
532 cancel,
533 Some(claim),
534 )
535 .await
536 .map(Some)
537 }
538
539 fn ensure_durable_store_facets_for_scope(
547 &self,
548 scoped_effect_controller: &ScopedEffectController<'_>,
549 ) -> Result<(), RuntimeError> {
550 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
551 {
552 return Ok(());
553 }
554 if self
555 .host
556 .core
557 .durability
558 .attachment_store
559 .persistence()
560 .durability_tier()
561 != crate::DurabilityTier::Durable
562 {
563 return Err(RuntimeError::durable_store_required(
564 crate::DurableStoreFacet::AttachmentStore,
565 ));
566 }
567 if self
568 .host
569 .core
570 .durability
571 .process_env_store
572 .durability_tier()
573 != crate::DurabilityTier::Durable
574 {
575 return Err(RuntimeError::durable_store_required(
576 crate::DurableStoreFacet::ProcessEnvStore,
577 ));
578 }
579 if let Some(store) = self
580 .session
581 .as_ref()
582 .and_then(|session| session.history_store())
583 && store.durability_tier() != crate::DurabilityTier::Durable
584 {
585 return Err(RuntimeError::durable_store_required(
586 crate::DurableStoreFacet::SessionStore,
587 ));
588 }
589 if let Some(process_registry) = self.host.process_registry.as_ref()
590 && process_registry.durability_tier() != crate::DurabilityTier::Durable
591 {
592 return Err(RuntimeError::durable_store_required(
593 crate::DurableStoreFacet::ProcessRegistry,
594 ));
595 }
596 Ok(())
597 }
598
599 async fn stream_turn_with_scoped_effect_controller_inner(
600 &mut self,
601 mut input: TurnInput,
602 events: &dyn EventSink,
603 turn_events: &dyn TurnActivitySink,
604 scoped_effect_controller: ScopedEffectController<'_>,
605 cancel: CancellationToken,
606 queued_claim: Option<crate::QueuedWorkClaim>,
607 ) -> Result<AssembledTurn, RuntimeError> {
608 if queued_claim.is_none() {
609 while self.drain_next_session_command().await?.is_some() {}
610 }
611 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
612 && scoped_effect_controller
613 .execution_scope()
614 .validates_turn_trace_id()
615 && input_turn_id != scoped_effect_controller.scope_id()
616 {
617 return Err(RuntimeError::new(
618 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
619 format!(
620 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
621 scoped_effect_controller.scope_id()
622 ),
623 ));
624 }
625 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
626 input
627 .trace_turn_id
628 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
629 self.stream_turn_inner(
630 input.clone(),
631 events,
632 turn_events,
633 scoped_effect_controller,
634 cancel.clone(),
635 queued_claim,
636 )
637 .await
638 }
639
640 pub async fn stream_turn_with_agent_frames(
649 &mut self,
650 input: TurnInput,
651 opts: TurnOptions<'_>,
652 ) -> Result<AgentFrameRun, RuntimeError> {
653 let cancel = opts.cancel.clone();
654 let scoped_effect_controller = opts.scoped_effect_controller();
655 self.stream_turn_with_agent_frames_inner(
656 input,
657 opts.events_or_noop(),
658 opts.turn_events_or_noop(),
659 scoped_effect_controller,
660 cancel,
661 )
662 .await
663 }
664
665 async fn stream_turn_with_agent_frames_inner(
666 &mut self,
667 mut input: TurnInput,
668 events: &dyn EventSink,
669 turn_events: &dyn TurnActivitySink,
670 scoped_effect_controller: ScopedEffectController<'_>,
671 cancel: CancellationToken,
672 ) -> Result<AgentFrameRun, RuntimeError> {
673 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
674 && scoped_effect_controller
675 .execution_scope()
676 .validates_turn_trace_id()
677 && input_turn_id != scoped_effect_controller.scope_id()
678 {
679 return Err(RuntimeError::new(
680 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
681 format!(
682 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
683 scoped_effect_controller.scope_id()
684 ),
685 ));
686 }
687 let follow_protocol_turn_options = input.protocol_turn_options.clone();
688 let follow_turn_context = input.turn_context.clone();
689 let follow_trace_turn_id = input
690 .trace_turn_id
691 .clone()
692 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
693 input
694 .trace_turn_id
695 .get_or_insert(follow_trace_turn_id.clone());
696 let mut turns = Vec::new();
697 loop {
698 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
699 input.trace_turn_id = Some(turn_trace_turn_id.clone());
700 let turn_effect_controller = if turns.is_empty() {
701 scoped_effect_controller.clone()
702 } else {
703 ScopedEffectController::borrowed(
704 scoped_effect_controller.controller(),
705 ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
706 )?
707 };
708 let turn = self
709 .stream_turn_with_scoped_effect_controller_inner(
710 input,
711 events,
712 turn_events,
713 turn_effect_controller,
714 cancel.clone(),
715 None,
716 )
717 .await?;
718 let switched_frame = match &turn.outcome {
719 TurnOutcome::AgentFrameSwitch { frame_id, task } => {
720 Some((frame_id.clone(), task.clone()))
721 }
722 _ => None,
723 };
724 turns.push(turn);
725
726 let Some((_frame_id, task)) = switched_frame else {
727 return Ok(AgentFrameRun { turns });
728 };
729 input = turn_input_from_text(task);
730 input.protocol_turn_options = follow_protocol_turn_options.clone();
731 input.turn_context = follow_turn_context.clone();
732 }
733 }
734
735 async fn stream_turn_inner(
736 &mut self,
737 mut input: TurnInput,
738 events: &dyn EventSink,
739 turn_events: &dyn TurnActivitySink,
740 scoped_effect_controller: ScopedEffectController<'_>,
741 cancel: CancellationToken,
742 queued_claim: Option<crate::QueuedWorkClaim>,
743 ) -> Result<AssembledTurn, RuntimeError> {
744 self.refresh_session_graph_from_store()
745 .await
746 .map_err(session_head_refresh_error)?;
747 let input_trace_turn_id = input.trace_turn_id.clone();
748 let queued_turn_work = queued_claim
749 .as_ref()
750 .map(crate::QueuedWorkClaim::materialize_for_turn);
751 if let Some(work) = queued_turn_work.as_ref()
752 && input.items.is_empty()
753 && input.image_blobs.is_empty()
754 {
755 input = work.input.clone();
756 if input.trace_turn_id.is_none() {
757 input.trace_turn_id = input_trace_turn_id;
758 }
759 }
760 if self
761 .session
762 .as_ref()
763 .and_then(|session| session.history_store())
764 .is_some()
765 {
766 ensure_durable_effect_input(&input)?;
767 }
768 if let Some(extension) = &input.protocol_extension
769 && let Some(session) = self.session.as_ref()
770 {
771 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
772 protocol_session
773 .validate_turn_extension(extension)
774 .await
775 .map_err(|err| {
776 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
777 })?;
778 }
779 let previous_prompt_usage = self.state.last_prompt_usage.clone();
780 let normalized = match self
781 .normalize_input_items(&input.items, &input.image_blobs)
782 .await
783 {
784 Ok(items) => items,
785 Err(e) => {
786 self.state.last_prompt_usage = None;
787 let mut assembler = TurnAssembler::default();
788 let error_event = SessionEvent::Error {
789 message: e.clone(),
790 envelope: Some(crate::session_model::ErrorEnvelope {
791 kind: "input_validation".to_string(),
792 code: Some("invalid_turn_input".to_string()),
793 terminal_reason: None,
794 user_message: e.clone(),
795 raw: None,
796 }),
797 };
798 assembler.push(&error_event);
799 emit_turn_activity_to_sink(
800 turn_events,
801 TurnActivity::independent(TurnEvent::Error { message: e }),
802 )
803 .await;
804 emit_session_event_to_sink(events, error_event).await;
805 let outcome_event = SessionEvent::TurnOutcome {
806 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
807 };
808 assembler.push(&outcome_event);
809 emit_session_event_to_sink(events, outcome_event).await;
810 assembler.push(&SessionEvent::Done);
811 emit_session_event_to_sink(events, SessionEvent::Done).await;
812 return Ok(assembler.finish(
813 self.state.to_snapshot(),
814 false,
815 None,
816 &self.host.core.control.termination,
817 ));
818 }
819 };
820 let turn_index = self.state.turn_index + 1;
821 let trace_turn_id = input
822 .trace_turn_id
823 .clone()
824 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
825 if self.host.core.tracing.trace_sink.is_some() {
826 let mut trace_metadata = std::collections::BTreeMap::new();
827 trace_metadata.insert(
828 "input_item_count".to_string(),
829 serde_json::json!(normalized.len()),
830 );
831 crate::trace::emit_trace(
832 &self.host.core.tracing.trace_sink,
833 &self.host.core.tracing.trace_context,
834 lash_trace::TraceContext::default()
835 .for_session(self.state.session_id.clone())
836 .for_turn_index(turn_index)
837 .for_turn(trace_turn_id.clone()),
838 lash_trace::TraceEvent::TurnStarted {
839 metadata: trace_metadata,
840 },
841 self.host.core.clock.as_ref(),
842 );
843 }
844
845 let base_read_model = self.state.read_model();
846 let base_messages = base_read_model.messages;
847 let base_render_cache = base_read_model.prompt_render_cache;
848 let mut turn_delta = Vec::new();
849 let initial_turn_causes = queued_turn_work
850 .as_ref()
851 .map(|work| work.turn_causes.clone())
852 .unwrap_or_default();
853 turn_delta.extend(
854 initial_turn_causes
855 .iter()
856 .map(crate::TurnCause::to_event_message),
857 );
858
859 let user_id = fresh_message_id();
860 let mut user_parts: Vec<Part> = Vec::new();
861 for item in normalized {
862 match item {
863 NormalizedItem::Text(text) => {
864 if text.is_empty() {
865 continue;
866 }
867 user_parts.push(Part {
868 id: format!("{}.p{}", user_id, user_parts.len()),
869 kind: PartKind::Text,
870 content: text,
871 attachment: None,
872 tool_call_id: None,
873 tool_name: None,
874 tool_replay: None,
875 prune_state: PruneState::Intact,
876 reasoning_meta: None,
877 response_meta: None,
878 });
879 }
880 NormalizedItem::Image(reference) => {
881 user_parts.push(Part {
882 id: format!("{}.p{}", user_id, user_parts.len()),
883 kind: PartKind::Image,
884 content: String::new(),
885 attachment: Some(crate::session_model::message::PartAttachment {
886 reference,
887 }),
888 tool_call_id: None,
889 tool_name: None,
890 tool_replay: None,
891 prune_state: PruneState::Intact,
892 reasoning_meta: None,
893 response_meta: None,
894 });
895 }
896 }
897 }
898 if user_parts.is_empty() && initial_turn_causes.is_empty() {
899 user_parts.push(Part {
900 id: format!("{}.p0", user_id),
901 kind: PartKind::Text,
902 content: String::new(),
903 attachment: None,
904 tool_call_id: None,
905 tool_name: None,
906 tool_replay: None,
907 prune_state: PruneState::Intact,
908 reasoning_meta: None,
909 response_meta: None,
910 });
911 }
912 if !user_parts.is_empty() {
913 reassign_part_ids(&user_id, &mut user_parts);
914 turn_delta.push(Message {
915 id: user_id.clone(),
916 role: MessageRole::User,
917 parts: shared_parts(user_parts),
918 origin: None,
919 });
920 }
921
922 let manager = self
923 .runtime_session_services_for_turn(None)
924 .map_err(|err| {
925 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
926 })?;
927 let plugin_session = self
928 .session
929 .as_ref()
930 .map(|s| Arc::clone(s.plugins()))
931 .ok_or_else(|| {
932 RuntimeError::new(
933 RuntimeErrorCode::ContextPrepareTurn,
934 "runtime session not available",
935 )
936 })?;
937 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
938 let prepare_phase_controller = scoped_child_turn_controller(
939 &scoped_effect_controller,
940 &self.state.session_id,
941 &prepare_phase_turn_id,
942 )?;
943 let turn_ctx = crate::TurnTransformContext {
944 session_id: self.state.session_id.clone(),
945 state: self.read_view(),
946 prompt_usage: previous_prompt_usage.clone(),
947 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
948 sessions: manager.state_service(),
949 session_lifecycle: manager.lifecycle_service(),
950 session_graph: manager.graph_service(),
951 scoped_effect_controller: scoped_effect_controller.clone(),
952 direct_completions: manager.direct_completion_client(
953 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
954 Some(prepare_phase_turn_id),
955 ),
956 };
957 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
958 let prepared_context = plugin_session
959 .prepare_turn_context(
960 &turn_ctx,
961 crate::session_model::context::PreparedContext {
962 messages: crate::MessageSequence::from_base_and_delta(
963 base_messages,
964 turn_delta,
965 )
966 .with_base_render_cache(base_render_cache),
967 ..Default::default()
968 },
969 self.turn_phase_probe.clone(),
970 )
971 .await
972 .map_err(|err| {
973 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
974 })?;
975 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
976 drop(turn_ctx);
981 let messages = prepared_context.messages;
982 if let Some(session) = self.session.as_mut() {
983 session
984 .set_context_overlay(
985 prepared_context.tool_providers,
986 prepared_context.prompt_contributions,
987 prepared_context.include_base_tools,
988 )
989 .map_err(|err| {
990 RuntimeError::new(
991 RuntimeErrorCode::Other("session_tool_registry".to_string()),
992 err.to_string(),
993 )
994 })?;
995 }
996
997 self.state.last_prompt_usage = None;
998 Box::pin(self.stream_prepared_turn(
999 messages,
1000 previous_prompt_usage,
1001 input.protocol_turn_options.clone(),
1002 input.protocol_extension.clone(),
1003 input.turn_context.clone(),
1004 initial_turn_causes,
1005 trace_turn_id,
1006 turn_index,
1007 events,
1008 turn_events,
1009 scoped_effect_controller,
1010 cancel,
1011 queued_claim,
1012 ))
1013 .await
1014 }
1015
1016 pub async fn run_turn_assembled(
1018 &mut self,
1019 input: TurnInput,
1020 cancel: CancellationToken,
1021 scoped_effect_controller: ScopedEffectController<'_>,
1022 ) -> Result<AssembledTurn, RuntimeError> {
1023 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1024 .await
1025 }
1026
1027 #[allow(clippy::too_many_arguments)]
1029 pub async fn stream_prepared_turn(
1030 &mut self,
1031 messages: crate::MessageSequence,
1032 _previous_prompt_usage: Option<PromptUsage>,
1033 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1034 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1035 turn_context: crate::TurnContext,
1036 initial_turn_causes: Vec<crate::TurnCause>,
1037 trace_turn_id: String,
1038 turn_index: usize,
1039 events: &dyn EventSink,
1040 turn_events: &dyn TurnActivitySink,
1041 scoped_effect_controller: ScopedEffectController<'_>,
1042 cancel: CancellationToken,
1043 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1044 ) -> Result<AssembledTurn, RuntimeError> {
1045 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1046 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1047 let mut turn_policy = self.state.effective_policy().clone();
1048 let turn_provider_override = turn_context.provider().cloned();
1049 if let Some(provider) = turn_provider_override.as_ref() {
1050 turn_policy.provider_id = provider.kind().to_string();
1051 }
1052 if let Some(model) = turn_context.model_spec() {
1053 turn_policy.model = model.clone();
1054 }
1055 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1056 let effective_protocol_turn_options = protocol_turn_options
1057 .clone()
1058 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1059 .unwrap_or(session_protocol_turn_options);
1060 let manager = self
1061 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1062 .map_err(|err| {
1063 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1064 })?;
1065 let plugins = {
1066 let session = self
1067 .session
1068 .as_ref()
1069 .expect("lash runtime session must be available");
1070 Arc::clone(session.plugins())
1071 };
1072 let mut assembler = TurnAssembler::new();
1073 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1074 let prepared = {
1080 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1081 PrepareTurnRequest {
1082 session_id: self.state.session_id.clone(),
1083 state: crate::SessionReadView::from_runtime_state(
1084 &self.state,
1085 turn_policy.clone(),
1086 effective_protocol_turn_options.clone(),
1087 ),
1088 messages,
1089 sessions: manager.state_service(),
1090 session_lifecycle: manager.lifecycle_service(),
1091 session_graph: manager.graph_service(),
1092 turn_context: turn_context.clone(),
1093 },
1094 self.turn_phase_probe.clone(),
1095 );
1096 let mut prepare_turn = Box::pin(prepare_turn);
1097
1098 loop {
1099 tokio::select! {
1100 prepared = prepare_turn.as_mut() => {
1101 let prepared = prepared.map_err(|err| {
1102 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1103 })?;
1104 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1105 break prepared;
1106 }
1107 maybe_event = event_rx.recv() => {
1108 if let Some(event) = maybe_event {
1109 emit_runtime_stream_event_to_sinks(
1110 events,
1111 turn_events,
1112 event,
1113 &mut assembler,
1114 )
1115 .await;
1116 }
1117 }
1118 }
1119 }
1120 };
1121 for event in &prepared.events {
1122 assembler.push(event);
1123 }
1124 emit_session_events_to_sink(events, prepared.events).await;
1125 if let Some(abort) = prepared.abort {
1126 drop(event_tx);
1127
1128 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1129 self.state.clone(),
1130 Arc::clone(&self.host.core.clock),
1131 );
1132 turn_pipeline.apply_prepared_messages(&prepared.messages);
1133 let state = turn_pipeline.into_final_state();
1134 let issue = TurnIssue {
1135 kind: "plugin".to_string(),
1136 code: Some(abort.code),
1137 terminal_reason: None,
1138 message: abort.message.clone(),
1139 raw: None,
1140 };
1141 let error_event = SessionEvent::Error {
1142 message: abort.message,
1143 envelope: Some(crate::session_model::ErrorEnvelope {
1144 kind: "plugin".to_string(),
1145 code: issue.code.clone(),
1146 terminal_reason: None,
1147 user_message: issue.message.clone(),
1148 raw: None,
1149 }),
1150 };
1151 assembler.push(&error_event);
1152 emit_turn_activity_to_sink(
1153 turn_events,
1154 TurnActivity::independent(TurnEvent::Error {
1155 message: issue.message.clone(),
1156 }),
1157 )
1158 .await;
1159 emit_session_event_to_sink(events, error_event).await;
1160 let outcome_event = SessionEvent::TurnOutcome {
1161 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1162 };
1163 assembler.push(&outcome_event);
1164 emit_session_event_to_sink(events, outcome_event).await;
1165 assembler.push(&SessionEvent::Done);
1166 emit_session_event_to_sink(events, SessionEvent::Done).await;
1167 return Ok(assembler.finish(
1168 state.to_snapshot(),
1169 cancel.is_cancelled(),
1170 Some(issue),
1171 &self.host.core.control.termination,
1172 ));
1173 }
1174 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1175 self.state.clone(),
1176 Arc::clone(&self.host.core.clock),
1177 );
1178 let store = self
1179 .session
1180 .as_ref()
1181 .and_then(|session| session.history_store());
1182 let progress_store = if scoped_effect_controller.controller().durability_tier()
1186 == crate::DurabilityTier::Durable
1187 {
1188 None
1189 } else {
1190 store.as_ref().map(|store| store.as_ref())
1191 };
1192 turn_pipeline
1193 .prepared_checkpoint(
1194 progress_store,
1195 turn_policy.clone(),
1196 turn_index,
1197 &prepared.messages,
1198 self.session.as_mut(),
1199 )
1200 .await
1201 .map_err(|err| {
1202 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1203 })?;
1204 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1205 RuntimeSessionPolicy::from_provider(
1206 turn_policy.clone(),
1207 provider.with_clock(Arc::clone(&self.host.core.clock)),
1208 )
1209 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1210 } else {
1211 self.host
1212 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1213 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1214 };
1215 let manager = self
1216 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1217 .map_err(|err| {
1218 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1219 })?;
1220 let cancel_state = cancel.clone();
1221 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1222 let session = self
1223 .session
1224 .take()
1225 .expect("lash runtime session must be available");
1226 let mut driver = RuntimeTurnDriver {
1227 session,
1228 policy: resolved_turn_policy,
1229 host: self.host.clone(),
1230 turn_id: scoped_effect_controller.scope_id().to_string(),
1231 scoped_effect_controller,
1232 session_id: self.state.session_id.clone(),
1233 turn_index,
1234 turn_pipeline,
1235 llm_stream_summaries: HashMap::new(),
1236 next_llm_ordinal: 0,
1237 session_services: manager,
1238 protocol_turn_options: effective_protocol_turn_options,
1239 protocol_extension,
1240 turn_context,
1241 turn_causes: initial_turn_causes,
1242 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1243 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1244 turn_phase_probe: self.turn_phase_probe.clone(),
1245 };
1246 let protocol_run_offset = 0;
1247 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1248 let run_result = drive_turn_to_completion(
1249 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1250 &mut event_rx,
1251 &mut assembler,
1252 &child_usage_event_relay,
1253 events,
1254 turn_events,
1255 )
1256 .await;
1257 let (new_messages, _new_protocol_iteration) = match run_result {
1258 Ok(result) => result,
1259 Err(err) => {
1260 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1261 let RuntimeTurnDriver { session, .. } = driver;
1262 self.session = Some(session);
1263 return Err(err);
1264 }
1265 };
1266 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1267 tracing::debug!(
1268 new_message_count = new_messages.len(),
1269 tool_call_count = assembler.tool_calls.len(),
1270 "runtime post-run_task"
1271 );
1272
1273 let RuntimeTurnDriver {
1274 session,
1275 policy,
1276 turn_pipeline,
1277 pending_queue_claims,
1278 ..
1279 } = driver;
1280 self.session = Some(session);
1281 self.finish_turn(
1282 TurnFinishInput {
1283 turn_pipeline,
1284 assembler,
1285 new_messages,
1286 policy,
1287 turn_index,
1288 queued_work_completions: pending_queue_claims
1289 .iter()
1290 .map(crate::QueuedWorkClaim::completion)
1291 .collect(),
1292 trace_turn_id,
1293 },
1294 events,
1295 &finish_scoped_effect_controller,
1296 &cancel_state,
1297 )
1298 .await
1299 }
1300 async fn normalize_input_items(
1301 &self,
1302 items: &[InputItem],
1303 image_blobs: &HashMap<String, Vec<u8>>,
1304 ) -> Result<Vec<NormalizedItem>, String> {
1305 normalize_input_items(
1306 items,
1307 image_blobs,
1308 self.host.core.durability.attachment_store.as_ref(),
1309 )
1310 .await
1311 }
1312}
1313
1314fn turn_input_from_text(text: String) -> TurnInput {
1315 TurnInput {
1316 items: vec![InputItem::Text { text }],
1317 image_blobs: HashMap::new(),
1318 protocol_turn_options: None,
1319 trace_turn_id: None,
1320 protocol_extension: None,
1321 turn_context: crate::TurnContext::default(),
1322 }
1323}
1324
1325fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1326 if completed_turn_count == 0 {
1327 root_turn_id.to_string()
1328 } else {
1329 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1330 }
1331}
1332
1333pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1334 if input.protocol_extension.is_some() {
1335 return Err(RuntimeError::new(
1336 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1337 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1338 ));
1339 }
1340 input
1341 .turn_context
1342 .live_plugin_inputs()
1343 .durable_effect_rejection()?;
1344 Ok(())
1345}
1346
1347async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1348 if !events.is_noop() {
1349 events.emit(activity).await;
1350 }
1351}
1352
1353async fn drive_turn_to_completion<F>(
1363 run_future: F,
1364 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1365 assembler: &mut TurnAssembler,
1366 child_usage_event_relay: &ChildUsageEventRelay,
1367 events: &dyn EventSink,
1368 turn_events: &dyn TurnActivitySink,
1369) -> Result<(crate::MessageSequence, usize), RuntimeError>
1370where
1371 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1372{
1373 let run_result = {
1374 let mut run_future = Box::pin(run_future);
1375 loop {
1376 tokio::select! {
1377 maybe_event = event_rx.recv() => {
1378 if let Some(event) = maybe_event {
1379 emit_runtime_stream_event_to_sinks(
1380 events,
1381 turn_events,
1382 event,
1383 assembler,
1384 )
1385 .await;
1386 }
1387 }
1388 completed = run_future.as_mut() => {
1389 child_usage_event_relay.clear();
1390 break completed;
1391 }
1392 }
1393 }
1394 };
1395 while let Some(event) = event_rx.recv().await {
1396 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1397 }
1398 run_result
1399}
1400
1401async fn emit_runtime_stream_event_to_sinks(
1402 events: &dyn EventSink,
1403 turn_events: &dyn TurnActivitySink,
1404 event: RuntimeStreamEvent,
1405 assembler: &mut TurnAssembler,
1406) {
1407 match event {
1408 RuntimeStreamEvent::Session(event) => {
1409 assembler.push(&event);
1410 emit_session_event_to_sink(events, event).await;
1411 }
1412 RuntimeStreamEvent::Turn(activity) => {
1413 assembler.push_turn_activity(&activity);
1414 emit_turn_activity_to_sink(turn_events, activity).await;
1415 }
1416 }
1417}
1418
1419#[cfg(test)]
1420mod tests {
1421 use super::agent_frame_follow_turn_id;
1422
1423 #[test]
1424 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1425 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1426 assert_eq!(
1427 agent_frame_follow_turn_id("root-turn", 1),
1428 "root-turn:agent-frame:1"
1429 );
1430 assert_eq!(
1431 agent_frame_follow_turn_id("root-turn", 2),
1432 "root-turn:agent-frame:2"
1433 );
1434 }
1435}