1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52 match payload {
53 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
56 }
57}
58
59fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
60 claim
61 .batches
62 .iter()
63 .map(|batch| batch.batch_id.clone())
64 .collect()
65}
66
67fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
68 format!("{parent_turn_id}:{phase}")
69}
70
71fn scoped_child_turn_controller<'run>(
72 scoped_effect_controller: &'run ScopedEffectController<'_>,
73 session_id: &str,
74 turn_id: &str,
75) -> Result<ScopedEffectController<'run>, RuntimeError> {
76 ScopedEffectController::borrowed(
77 scoped_effect_controller.controller(),
78 ExecutionScope::turn(session_id, turn_id),
79 )
80}
81
82pub(in crate::runtime) fn queued_work_trace_payload(
83 boundary: crate::QueuedWorkClaimBoundary,
84 claim: &crate::QueuedWorkClaim,
85 causes: &[crate::TurnCause],
86) -> serde_json::Value {
87 serde_json::json!({
88 "boundary": boundary,
89 "claim_id": claim.claim_id,
90 "owner_id": claim.owner_id,
91 "batch_ids": queued_work_batch_ids(claim),
92 "payload_types": claim.batches.iter()
93 .flat_map(|batch| batch.items.iter())
94 .map(|item| queued_work_payload_type(&item.payload))
95 .collect::<Vec<_>>(),
96 "causes": causes,
97 })
98}
99
100pub(in crate::runtime) fn queued_work_completion_trace_payload(
101 completions: &[crate::QueuedWorkCompletion],
102) -> serde_json::Value {
103 serde_json::json!({
104 "claims": completions.iter().map(|completion| {
105 serde_json::json!({
106 "session_id": completion.session_id,
107 "claim_id": completion.claim_id,
108 "batch_ids": completion.batch_ids,
109 })
110 }).collect::<Vec<_>>(),
111 })
112}
113
114async fn emit_queued_work_started_to_sink(
115 events: &dyn TurnActivitySink,
116 boundary: crate::QueuedWorkClaimBoundary,
117 claim: &crate::QueuedWorkClaim,
118 causes: Vec<crate::TurnCause>,
119) {
120 emit_turn_activity_to_sink(
121 events,
122 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
123 boundary,
124 batch_ids: queued_work_batch_ids(claim),
125 causes,
126 }),
127 )
128 .await;
129}
130
131pub(in crate::runtime) async fn send_queued_work_started_event(
132 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
133 boundary: crate::QueuedWorkClaimBoundary,
134 claim: &crate::QueuedWorkClaim,
135 causes: Vec<crate::TurnCause>,
136) {
137 send_turn_activity(
138 event_tx,
139 TurnActivityId::fresh(),
140 TurnEvent::QueuedWorkStarted {
141 boundary,
142 batch_ids: queued_work_batch_ids(claim),
143 causes,
144 },
145 )
146 .await;
147}
148
149struct TurnFinishInput {
150 turn_pipeline: TurnBoundary,
151 assembler: TurnAssembler,
152 new_messages: crate::MessageSequence,
153 policy: RuntimeSessionPolicy,
154 turn_index: usize,
155 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
156 trace_turn_id: String,
157}
158
159impl LashRuntime {
160 fn max_context_tokens(&self) -> usize {
161 self.state.effective_policy().context_window_tokens()
162 }
163 #[doc(hidden)]
164 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
165 self.turn_phase_probe = Some(probe);
166 }
167
168 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
169 if let Some(probe) = self.turn_phase_probe.as_ref() {
170 probe.begin(phase);
171 }
172 }
173
174 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
175 if let Some(probe) = self.turn_phase_probe.as_ref() {
176 probe.end(phase);
177 }
178 }
179
180 async fn finish_turn(
181 &mut self,
182 finish: TurnFinishInput,
183 events: &dyn EventSink,
184 scoped_effect_controller: &ScopedEffectController<'_>,
185 cancel_state: &CancellationToken,
186 ) -> Result<AssembledTurn, RuntimeError> {
187 let TurnFinishInput {
188 mut turn_pipeline,
189 assembler,
190 new_messages,
191 policy,
192 turn_index,
193 queued_work_completions,
194 trace_turn_id,
195 } = finish;
196 self.policy = self.state.effective_policy().clone();
197 turn_pipeline.state_mut().policy = self.policy.clone();
198 turn_pipeline.state_mut().turn_index = turn_index;
199
200 let mut turn_usage_delta = {
201 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
202 std::mem::take(&mut *ledger)
203 };
204 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
205 turn_usage_delta.push(TokenLedgerEntry {
206 source: "turn".to_string(),
207 model: policy.model.id.clone(),
208 usage: assembler.token_usage.clone(),
209 });
210 }
211 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
212
213 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
214 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
215 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
216 }
217
218 let last_prompt_usage = assembler
219 .last_llm_usage()
220 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
221 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
222 let assembled_state = turn_pipeline.export_state_for_assembly();
223 let assembled = assembler.finish(
224 assembled_state,
225 cancel_state.is_cancelled(),
226 None,
227 &self.host.core.control.termination,
228 );
229
230 let Some(session) = self.session.as_ref() else {
231 self.state.apply_snapshot(&assembled.state);
232 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
233 return Ok(assembled);
234 };
235
236 let plugins = Arc::clone(session.plugins());
237 let manager = match self.runtime_session_services_for_turn(None) {
238 Ok(manager) => manager,
239 Err(err) => {
240 return Err(RuntimeError::new(
241 RuntimeErrorCode::PluginSessionManager,
242 err.to_string(),
243 ));
244 }
245 };
246
247 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
248 let finalized = match plugins
249 .finalize_turn_with_phase_probe(
250 assembled,
251 manager.state_service(),
252 manager.lifecycle_service(),
253 manager.graph_service(),
254 self.turn_phase_probe.clone(),
255 )
256 .await
257 {
258 Ok(finalized) => finalized,
259 Err(err) => {
260 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
261 return Err(RuntimeError::new(
262 RuntimeErrorCode::PluginFinalizeTurn,
263 err.to_string(),
264 ));
265 }
266 };
267 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
268
269 let mut returned_turn = finalized.turn;
270 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
271 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
272 let queued_work_completion_trace = queued_work_completions.clone();
273 let pending_attachment_ids = self
274 .host
275 .core
276 .durability
277 .attachment_store
278 .pending_manifest_commit_ids();
279 if let Err(err) = turn_pipeline
280 .final_commit(
281 &mut returned_turn,
282 self.session.as_mut(),
283 &turn_usage_delta,
284 Some(&trace_turn_id),
285 queued_work_completions,
286 pending_attachment_ids.clone(),
287 )
288 .await
289 {
290 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
291 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
292 return Err(err);
293 }
294 self.host
295 .core
296 .durability
297 .attachment_store
298 .mark_manifest_committed(&pending_attachment_ids);
299 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
300
301 emit_session_events_to_sink(events, finalized.events).await;
302 self.state = turn_pipeline.into_final_state();
303 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
304 && let Some(session) = self.session.as_mut()
305 {
306 let protocol_session = Arc::clone(session.plugins().protocol_session());
307 let session_id = self.state.session_id.clone();
308 protocol_session
309 .restore_session(
310 crate::plugin::ProtocolSessionContext::new(session, &session_id),
311 &self.state,
312 )
313 .await
314 .map_err(|err| {
315 RuntimeError::new(
316 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
317 err.to_string(),
318 )
319 })?;
320 }
321 if !queued_work_completion_trace.is_empty() {
322 crate::trace::emit_trace(
323 &self.host.core.tracing.trace_sink,
324 &self.host.core.tracing.trace_context,
325 lash_trace::TraceContext::default()
326 .for_session(returned_turn.state.session_id.clone())
327 .for_turn_index(returned_turn.state.turn_index)
328 .for_turn(trace_turn_id.clone()),
329 lash_trace::TraceEvent::Custom {
330 name: "queued_work.completed".to_string(),
331 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
332 },
333 );
334 }
335 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
336 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
337 .await?;
338 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
339 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
340
341 self.emit_completed_turn_trace(
342 &returned_turn.state,
343 &returned_turn.outcome,
344 &trace_turn_id,
345 );
346 Ok(returned_turn)
347 }
348
349 fn emit_completed_turn_trace(
350 &self,
351 state: &SessionSnapshot,
352 outcome: &TurnOutcome,
353 trace_turn_id: &str,
354 ) {
355 if self.host.core.tracing.trace_sink.is_none() {
356 return;
357 }
358
359 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
360 crate::trace::emit_trace(
361 &self.host.core.tracing.trace_sink,
362 &self.host.core.tracing.trace_context,
363 lash_trace::TraceContext::default()
364 .for_session(state.session_id.clone())
365 .for_turn_index(state.turn_index)
366 .for_turn(trace_turn_id.to_string()),
367 lash_trace::TraceEvent::TurnCompleted {
368 status: status.to_string(),
369 done_reason: done_reason.to_string(),
370 agent_frame_switch,
371 },
372 );
373 }
374
375 async fn emit_turn_persisted_event(
376 &self,
377 returned_turn: &AssembledTurn,
378 scoped_effect_controller: &ScopedEffectController<'_>,
379 trace_turn_id: &str,
380 ) -> Result<(), RuntimeError> {
381 let Some(session) = self.session.as_ref() else {
382 return Ok(());
383 };
384 let Ok(manager) = self.runtime_session_services() else {
385 return Ok(());
386 };
387 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
388 let phase_controller = scoped_child_turn_controller(
389 scoped_effect_controller,
390 &self.state.session_id,
391 &phase_turn_id,
392 )?;
393 let direct_completions = manager.direct_completion_client(
394 RuntimeEffectControllerHandle::borrowed(phase_controller),
395 Some(phase_turn_id),
396 );
397
398 session
399 .plugins()
400 .emit_runtime_event_with_phase_probe(
401 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
402 crate::SessionStateChangedContext {
403 session_id: self.state.session_id.clone(),
404 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
405 sessions: manager.state_service(),
406 session_graph: manager.graph_service(),
407 direct_completions,
408 },
409 )),
410 self.turn_phase_probe.clone(),
411 )
412 .await;
413 Ok(())
414 }
415
416 pub async fn stream_turn(
418 &mut self,
419 input: TurnInput,
420 opts: TurnOptions<'_>,
421 ) -> Result<AssembledTurn, RuntimeError> {
422 let cancel = opts.cancel.clone();
423 let scoped_effect_controller = opts.scoped_effect_controller();
424 self.stream_turn_with_scoped_effect_controller_inner(
425 input,
426 opts.events_or_noop(),
427 opts.turn_events_or_noop(),
428 scoped_effect_controller,
429 cancel,
430 None,
431 )
432 .await
433 }
434
435 pub async fn stream_next_queued_work(
436 &mut self,
437 opts: TurnOptions<'_>,
438 ) -> Result<Option<AssembledTurn>, RuntimeError> {
439 self.stream_queued_work(opts, None).await
440 }
441
442 pub async fn stream_selected_queued_work(
443 &mut self,
444 opts: TurnOptions<'_>,
445 batch_ids: &[String],
446 ) -> Result<Option<AssembledTurn>, RuntimeError> {
447 self.stream_queued_work(opts, Some(batch_ids)).await
448 }
449
450 async fn stream_queued_work(
451 &mut self,
452 opts: TurnOptions<'_>,
453 selected_batch_ids: Option<&[String]>,
454 ) -> Result<Option<AssembledTurn>, RuntimeError> {
455 if self.drain_next_session_command().await?.is_some() {
456 return Ok(None);
457 }
458 let Some(store) = self
459 .session
460 .as_ref()
461 .and_then(|session| session.history_store())
462 else {
463 return Ok(None);
464 };
465 let claim = if let Some(batch_ids) = selected_batch_ids {
466 store
467 .claim_ready_queued_work_by_batch_ids(
468 &self.state.session_id,
469 &self.runtime_scope_id,
470 crate::QueuedWorkClaimBoundary::Idle,
471 crate::QUEUED_WORK_CLAIM_TTL_MS,
472 batch_ids,
473 )
474 .await
475 } else {
476 store
477 .claim_ready_queued_work(
478 &self.state.session_id,
479 &self.runtime_scope_id,
480 crate::QueuedWorkClaimBoundary::Idle,
481 crate::QUEUED_WORK_CLAIM_TTL_MS,
482 64,
483 )
484 .await
485 }
486 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
487 let Some(claim) = claim else {
488 return Ok(None);
489 };
490 let mut work = claim.materialize_for_turn();
491 let turn_id = work
492 .input
493 .trace_turn_id
494 .clone()
495 .or_else(|| Some(opts.execution_scope_id().to_owned()))
496 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
497 work.input.trace_turn_id = Some(turn_id.clone());
498 let causes = work.turn_causes.clone();
499 emit_queued_work_started_to_sink(
500 opts.turn_events_or_noop(),
501 crate::QueuedWorkClaimBoundary::Idle,
502 &claim,
503 causes.clone(),
504 )
505 .await;
506 crate::trace::emit_trace(
507 &self.host.core.tracing.trace_sink,
508 &self.host.core.tracing.trace_context,
509 lash_trace::TraceContext::default()
510 .for_session(self.state.session_id.clone())
511 .for_turn_index(self.state.turn_index + 1)
512 .for_turn(turn_id.clone()),
513 lash_trace::TraceEvent::Custom {
514 name: "queued_work.claimed".to_string(),
515 payload: queued_work_trace_payload(
516 crate::QueuedWorkClaimBoundary::Idle,
517 &claim,
518 &causes,
519 ),
520 },
521 );
522 let cancel = opts.cancel.clone();
523 let scoped_effect_controller = opts.scoped_effect_controller();
524 self.stream_turn_with_scoped_effect_controller_inner(
525 work.input,
526 opts.events_or_noop(),
527 opts.turn_events_or_noop(),
528 scoped_effect_controller,
529 cancel,
530 Some(claim),
531 )
532 .await
533 .map(Some)
534 }
535
536 fn ensure_durable_store_facets_for_scope(
544 &self,
545 scoped_effect_controller: &ScopedEffectController<'_>,
546 ) -> Result<(), RuntimeError> {
547 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
548 {
549 return Ok(());
550 }
551 if self
552 .host
553 .core
554 .durability
555 .attachment_store
556 .persistence()
557 .durability_tier()
558 != crate::DurabilityTier::Durable
559 {
560 return Err(RuntimeError::durable_store_required(
561 crate::DurableStoreFacet::AttachmentStore,
562 ));
563 }
564 if self
565 .host
566 .core
567 .durability
568 .process_env_store
569 .durability_tier()
570 != crate::DurabilityTier::Durable
571 {
572 return Err(RuntimeError::durable_store_required(
573 crate::DurableStoreFacet::ProcessEnvStore,
574 ));
575 }
576 if let Some(store) = self
577 .session
578 .as_ref()
579 .and_then(|session| session.history_store())
580 && store.durability_tier() != crate::DurabilityTier::Durable
581 {
582 return Err(RuntimeError::durable_store_required(
583 crate::DurableStoreFacet::SessionStore,
584 ));
585 }
586 if let Some(process_registry) = self.host.process_registry.as_ref()
587 && process_registry.durability_tier() != crate::DurabilityTier::Durable
588 {
589 return Err(RuntimeError::durable_store_required(
590 crate::DurableStoreFacet::ProcessRegistry,
591 ));
592 }
593 Ok(())
594 }
595
596 async fn stream_turn_with_scoped_effect_controller_inner(
597 &mut self,
598 mut input: TurnInput,
599 events: &dyn EventSink,
600 turn_events: &dyn TurnActivitySink,
601 scoped_effect_controller: ScopedEffectController<'_>,
602 cancel: CancellationToken,
603 queued_claim: Option<crate::QueuedWorkClaim>,
604 ) -> Result<AssembledTurn, RuntimeError> {
605 if queued_claim.is_none() {
606 while self.drain_next_session_command().await?.is_some() {}
607 }
608 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
609 && scoped_effect_controller
610 .execution_scope()
611 .validates_turn_trace_id()
612 && input_turn_id != scoped_effect_controller.scope_id()
613 {
614 return Err(RuntimeError::new(
615 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
616 format!(
617 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
618 scoped_effect_controller.scope_id()
619 ),
620 ));
621 }
622 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
623 input
624 .trace_turn_id
625 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
626 self.stream_turn_inner(
627 input.clone(),
628 events,
629 turn_events,
630 scoped_effect_controller,
631 cancel.clone(),
632 queued_claim,
633 )
634 .await
635 }
636
637 pub async fn stream_turn_with_agent_frames(
646 &mut self,
647 input: TurnInput,
648 opts: TurnOptions<'_>,
649 ) -> Result<AgentFrameRun, RuntimeError> {
650 let cancel = opts.cancel.clone();
651 let scoped_effect_controller = opts.scoped_effect_controller();
652 self.stream_turn_with_agent_frames_inner(
653 input,
654 opts.events_or_noop(),
655 opts.turn_events_or_noop(),
656 scoped_effect_controller,
657 cancel,
658 )
659 .await
660 }
661
662 async fn stream_turn_with_agent_frames_inner(
663 &mut self,
664 mut input: TurnInput,
665 events: &dyn EventSink,
666 turn_events: &dyn TurnActivitySink,
667 scoped_effect_controller: ScopedEffectController<'_>,
668 cancel: CancellationToken,
669 ) -> Result<AgentFrameRun, RuntimeError> {
670 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
671 && scoped_effect_controller
672 .execution_scope()
673 .validates_turn_trace_id()
674 && input_turn_id != scoped_effect_controller.scope_id()
675 {
676 return Err(RuntimeError::new(
677 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
678 format!(
679 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
680 scoped_effect_controller.scope_id()
681 ),
682 ));
683 }
684 let follow_protocol_turn_options = input.protocol_turn_options.clone();
685 let follow_turn_context = input.turn_context.clone();
686 let follow_trace_turn_id = input
687 .trace_turn_id
688 .clone()
689 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
690 input
691 .trace_turn_id
692 .get_or_insert(follow_trace_turn_id.clone());
693 let mut turns = Vec::new();
694 loop {
695 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
696 input.trace_turn_id = Some(turn_trace_turn_id.clone());
697 let turn_effect_controller = if turns.is_empty() {
698 scoped_effect_controller.clone()
699 } else {
700 ScopedEffectController::borrowed(
701 scoped_effect_controller.controller(),
702 ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
703 )?
704 };
705 let turn = self
706 .stream_turn_with_scoped_effect_controller_inner(
707 input,
708 events,
709 turn_events,
710 turn_effect_controller,
711 cancel.clone(),
712 None,
713 )
714 .await?;
715 let switched_frame = match &turn.outcome {
716 TurnOutcome::AgentFrameSwitch { frame_id, task } => {
717 Some((frame_id.clone(), task.clone()))
718 }
719 _ => None,
720 };
721 turns.push(turn);
722
723 let Some((_frame_id, task)) = switched_frame else {
724 return Ok(AgentFrameRun { turns });
725 };
726 input = turn_input_from_text(task);
727 input.protocol_turn_options = follow_protocol_turn_options.clone();
728 input.turn_context = follow_turn_context.clone();
729 }
730 }
731
732 async fn stream_turn_inner(
733 &mut self,
734 mut input: TurnInput,
735 events: &dyn EventSink,
736 turn_events: &dyn TurnActivitySink,
737 scoped_effect_controller: ScopedEffectController<'_>,
738 cancel: CancellationToken,
739 queued_claim: Option<crate::QueuedWorkClaim>,
740 ) -> Result<AssembledTurn, RuntimeError> {
741 self.refresh_session_graph_from_store()
742 .await
743 .map_err(session_head_refresh_error)?;
744 let input_trace_turn_id = input.trace_turn_id.clone();
745 let queued_turn_work = queued_claim
746 .as_ref()
747 .map(crate::QueuedWorkClaim::materialize_for_turn);
748 if let Some(work) = queued_turn_work.as_ref()
749 && input.items.is_empty()
750 && input.image_blobs.is_empty()
751 {
752 input = work.input.clone();
753 if input.trace_turn_id.is_none() {
754 input.trace_turn_id = input_trace_turn_id;
755 }
756 }
757 if self
758 .session
759 .as_ref()
760 .and_then(|session| session.history_store())
761 .is_some()
762 {
763 ensure_durable_effect_input(&input)?;
764 }
765 if let Some(extension) = &input.protocol_extension
766 && let Some(session) = self.session.as_ref()
767 {
768 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
769 protocol_session
770 .validate_turn_extension(extension)
771 .await
772 .map_err(|err| {
773 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
774 })?;
775 }
776 let previous_prompt_usage = self.state.last_prompt_usage.clone();
777 let normalized = match self
778 .normalize_input_items(&input.items, &input.image_blobs)
779 .await
780 {
781 Ok(items) => items,
782 Err(e) => {
783 self.state.last_prompt_usage = None;
784 let mut assembler = TurnAssembler::default();
785 let error_event = SessionEvent::Error {
786 message: e.clone(),
787 envelope: Some(crate::session_model::ErrorEnvelope {
788 kind: "input_validation".to_string(),
789 code: Some("invalid_turn_input".to_string()),
790 terminal_reason: None,
791 user_message: e.clone(),
792 raw: None,
793 }),
794 };
795 assembler.push(&error_event);
796 emit_turn_activity_to_sink(
797 turn_events,
798 TurnActivity::independent(TurnEvent::Error { message: e }),
799 )
800 .await;
801 emit_session_event_to_sink(events, error_event).await;
802 let outcome_event = SessionEvent::TurnOutcome {
803 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
804 };
805 assembler.push(&outcome_event);
806 emit_session_event_to_sink(events, outcome_event).await;
807 assembler.push(&SessionEvent::Done);
808 emit_session_event_to_sink(events, SessionEvent::Done).await;
809 return Ok(assembler.finish(
810 self.state.to_snapshot(),
811 false,
812 None,
813 &self.host.core.control.termination,
814 ));
815 }
816 };
817 let turn_index = self.state.turn_index + 1;
818 let trace_turn_id = input
819 .trace_turn_id
820 .clone()
821 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
822 if self.host.core.tracing.trace_sink.is_some() {
823 let mut trace_metadata = std::collections::BTreeMap::new();
824 trace_metadata.insert(
825 "input_item_count".to_string(),
826 serde_json::json!(normalized.len()),
827 );
828 crate::trace::emit_trace(
829 &self.host.core.tracing.trace_sink,
830 &self.host.core.tracing.trace_context,
831 lash_trace::TraceContext::default()
832 .for_session(self.state.session_id.clone())
833 .for_turn_index(turn_index)
834 .for_turn(trace_turn_id.clone()),
835 lash_trace::TraceEvent::TurnStarted {
836 metadata: trace_metadata,
837 },
838 );
839 }
840
841 let base_read_model = self.state.read_model();
842 let base_messages = base_read_model.messages;
843 let base_render_cache = base_read_model.prompt_render_cache;
844 let mut turn_delta = Vec::new();
845 let initial_turn_causes = queued_turn_work
846 .as_ref()
847 .map(|work| work.turn_causes.clone())
848 .unwrap_or_default();
849 turn_delta.extend(
850 initial_turn_causes
851 .iter()
852 .map(crate::TurnCause::to_event_message),
853 );
854
855 let user_id = fresh_message_id();
856 let mut user_parts: Vec<Part> = Vec::new();
857 for item in normalized {
858 match item {
859 NormalizedItem::Text(text) => {
860 if text.is_empty() {
861 continue;
862 }
863 user_parts.push(Part {
864 id: format!("{}.p{}", user_id, user_parts.len()),
865 kind: PartKind::Text,
866 content: text,
867 attachment: None,
868 tool_call_id: None,
869 tool_name: None,
870 tool_replay: None,
871 prune_state: PruneState::Intact,
872 reasoning_meta: None,
873 response_meta: None,
874 });
875 }
876 NormalizedItem::Image(reference) => {
877 user_parts.push(Part {
878 id: format!("{}.p{}", user_id, user_parts.len()),
879 kind: PartKind::Image,
880 content: String::new(),
881 attachment: Some(crate::session_model::message::PartAttachment {
882 reference,
883 }),
884 tool_call_id: None,
885 tool_name: None,
886 tool_replay: None,
887 prune_state: PruneState::Intact,
888 reasoning_meta: None,
889 response_meta: None,
890 });
891 }
892 }
893 }
894 if user_parts.is_empty() && initial_turn_causes.is_empty() {
895 user_parts.push(Part {
896 id: format!("{}.p0", user_id),
897 kind: PartKind::Text,
898 content: String::new(),
899 attachment: None,
900 tool_call_id: None,
901 tool_name: None,
902 tool_replay: None,
903 prune_state: PruneState::Intact,
904 reasoning_meta: None,
905 response_meta: None,
906 });
907 }
908 if !user_parts.is_empty() {
909 reassign_part_ids(&user_id, &mut user_parts);
910 turn_delta.push(Message {
911 id: user_id.clone(),
912 role: MessageRole::User,
913 parts: shared_parts(user_parts),
914 origin: None,
915 });
916 }
917
918 let manager = self
919 .runtime_session_services_for_turn(None)
920 .map_err(|err| {
921 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
922 })?;
923 let plugin_session = self
924 .session
925 .as_ref()
926 .map(|s| Arc::clone(s.plugins()))
927 .ok_or_else(|| {
928 RuntimeError::new(
929 RuntimeErrorCode::ContextPrepareTurn,
930 "runtime session not available",
931 )
932 })?;
933 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
934 let prepare_phase_controller = scoped_child_turn_controller(
935 &scoped_effect_controller,
936 &self.state.session_id,
937 &prepare_phase_turn_id,
938 )?;
939 let turn_ctx = crate::TurnTransformContext {
940 session_id: self.state.session_id.clone(),
941 state: self.read_view(),
942 prompt_usage: previous_prompt_usage.clone(),
943 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
944 sessions: manager.state_service(),
945 session_lifecycle: manager.lifecycle_service(),
946 session_graph: manager.graph_service(),
947 scoped_effect_controller: scoped_effect_controller.clone(),
948 direct_completions: manager.direct_completion_client(
949 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
950 Some(prepare_phase_turn_id),
951 ),
952 };
953 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
954 let prepared_context = plugin_session
955 .prepare_turn_context(
956 &turn_ctx,
957 crate::session_model::context::PreparedContext {
958 messages: crate::MessageSequence::from_base_and_delta(
959 base_messages,
960 turn_delta,
961 )
962 .with_base_render_cache(base_render_cache),
963 ..Default::default()
964 },
965 self.turn_phase_probe.clone(),
966 )
967 .await
968 .map_err(|err| {
969 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
970 })?;
971 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
972 drop(turn_ctx);
977 let messages = prepared_context.messages;
978 if let Some(session) = self.session.as_mut() {
979 session
980 .set_context_overlay(
981 prepared_context.tool_providers,
982 prepared_context.prompt_contributions,
983 prepared_context.include_base_tools,
984 )
985 .map_err(|err| {
986 RuntimeError::new(
987 RuntimeErrorCode::Other("session_tool_registry".to_string()),
988 err.to_string(),
989 )
990 })?;
991 }
992
993 self.state.last_prompt_usage = None;
994 Box::pin(self.stream_prepared_turn(
995 messages,
996 previous_prompt_usage,
997 input.protocol_turn_options.clone(),
998 input.protocol_extension.clone(),
999 input.turn_context.clone(),
1000 initial_turn_causes,
1001 trace_turn_id,
1002 turn_index,
1003 events,
1004 turn_events,
1005 scoped_effect_controller,
1006 cancel,
1007 queued_claim,
1008 ))
1009 .await
1010 }
1011
1012 pub async fn run_turn_assembled(
1014 &mut self,
1015 input: TurnInput,
1016 cancel: CancellationToken,
1017 scoped_effect_controller: ScopedEffectController<'_>,
1018 ) -> Result<AssembledTurn, RuntimeError> {
1019 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1020 .await
1021 }
1022
1023 #[allow(clippy::too_many_arguments)]
1025 pub async fn stream_prepared_turn(
1026 &mut self,
1027 messages: crate::MessageSequence,
1028 _previous_prompt_usage: Option<PromptUsage>,
1029 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1030 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1031 turn_context: crate::TurnContext,
1032 initial_turn_causes: Vec<crate::TurnCause>,
1033 trace_turn_id: String,
1034 turn_index: usize,
1035 events: &dyn EventSink,
1036 turn_events: &dyn TurnActivitySink,
1037 scoped_effect_controller: ScopedEffectController<'_>,
1038 cancel: CancellationToken,
1039 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1040 ) -> Result<AssembledTurn, RuntimeError> {
1041 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1042 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1043 let mut turn_policy = self.state.effective_policy().clone();
1044 let turn_provider_override = turn_context.provider().cloned();
1045 if let Some(provider) = turn_provider_override.as_ref() {
1046 turn_policy.provider_id = provider.kind().to_string();
1047 }
1048 if let Some(model) = turn_context.model_spec() {
1049 turn_policy.model = model.clone();
1050 }
1051 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1052 let effective_protocol_turn_options = protocol_turn_options
1053 .clone()
1054 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1055 .unwrap_or(session_protocol_turn_options);
1056 let manager = self
1057 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1058 .map_err(|err| {
1059 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1060 })?;
1061 let plugins = {
1062 let session = self
1063 .session
1064 .as_ref()
1065 .expect("lash runtime session must be available");
1066 Arc::clone(session.plugins())
1067 };
1068 let mut assembler = TurnAssembler::new();
1069 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1070 let prepared = {
1076 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1077 PrepareTurnRequest {
1078 session_id: self.state.session_id.clone(),
1079 state: crate::SessionReadView::from_runtime_state(
1080 &self.state,
1081 turn_policy.clone(),
1082 effective_protocol_turn_options.clone(),
1083 ),
1084 messages,
1085 sessions: manager.state_service(),
1086 session_lifecycle: manager.lifecycle_service(),
1087 session_graph: manager.graph_service(),
1088 turn_context: turn_context.clone(),
1089 },
1090 self.turn_phase_probe.clone(),
1091 );
1092 let mut prepare_turn = Box::pin(prepare_turn);
1093
1094 loop {
1095 tokio::select! {
1096 prepared = prepare_turn.as_mut() => {
1097 let prepared = prepared.map_err(|err| {
1098 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1099 })?;
1100 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1101 break prepared;
1102 }
1103 maybe_event = event_rx.recv() => {
1104 if let Some(event) = maybe_event {
1105 emit_runtime_stream_event_to_sinks(
1106 events,
1107 turn_events,
1108 event,
1109 &mut assembler,
1110 )
1111 .await;
1112 }
1113 }
1114 }
1115 }
1116 };
1117 for event in &prepared.events {
1118 assembler.push(event);
1119 }
1120 emit_session_events_to_sink(events, prepared.events).await;
1121 if let Some(abort) = prepared.abort {
1122 drop(event_tx);
1123
1124 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1125 turn_pipeline.apply_prepared_messages(&prepared.messages);
1126 let state = turn_pipeline.into_final_state();
1127 let issue = TurnIssue {
1128 kind: "plugin".to_string(),
1129 code: Some(abort.code),
1130 terminal_reason: None,
1131 message: abort.message.clone(),
1132 raw: None,
1133 };
1134 let error_event = SessionEvent::Error {
1135 message: abort.message,
1136 envelope: Some(crate::session_model::ErrorEnvelope {
1137 kind: "plugin".to_string(),
1138 code: issue.code.clone(),
1139 terminal_reason: None,
1140 user_message: issue.message.clone(),
1141 raw: None,
1142 }),
1143 };
1144 assembler.push(&error_event);
1145 emit_turn_activity_to_sink(
1146 turn_events,
1147 TurnActivity::independent(TurnEvent::Error {
1148 message: issue.message.clone(),
1149 }),
1150 )
1151 .await;
1152 emit_session_event_to_sink(events, error_event).await;
1153 let outcome_event = SessionEvent::TurnOutcome {
1154 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1155 };
1156 assembler.push(&outcome_event);
1157 emit_session_event_to_sink(events, outcome_event).await;
1158 assembler.push(&SessionEvent::Done);
1159 emit_session_event_to_sink(events, SessionEvent::Done).await;
1160 return Ok(assembler.finish(
1161 state.to_snapshot(),
1162 cancel.is_cancelled(),
1163 Some(issue),
1164 &self.host.core.control.termination,
1165 ));
1166 }
1167 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1168 let store = self
1169 .session
1170 .as_ref()
1171 .and_then(|session| session.history_store());
1172 let progress_store = if scoped_effect_controller.controller().durability_tier()
1176 == crate::DurabilityTier::Durable
1177 {
1178 None
1179 } else {
1180 store.as_ref().map(|store| store.as_ref())
1181 };
1182 turn_pipeline
1183 .prepared_checkpoint(
1184 progress_store,
1185 turn_policy.clone(),
1186 turn_index,
1187 &prepared.messages,
1188 self.session.as_mut(),
1189 )
1190 .await
1191 .map_err(|err| {
1192 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1193 })?;
1194 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1195 RuntimeSessionPolicy::from_provider(turn_policy.clone(), provider)
1196 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1197 } else {
1198 self.host
1199 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1200 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1201 };
1202 let manager = self
1203 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1204 .map_err(|err| {
1205 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1206 })?;
1207 let cancel_state = cancel.clone();
1208 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1209 let session = self
1210 .session
1211 .take()
1212 .expect("lash runtime session must be available");
1213 let mut driver = RuntimeTurnDriver {
1214 session,
1215 policy: resolved_turn_policy,
1216 host: self.host.clone(),
1217 turn_id: scoped_effect_controller.scope_id().to_string(),
1218 scoped_effect_controller,
1219 session_id: self.state.session_id.clone(),
1220 turn_index,
1221 turn_pipeline,
1222 llm_stream_summaries: HashMap::new(),
1223 next_llm_ordinal: 0,
1224 session_services: manager,
1225 protocol_turn_options: effective_protocol_turn_options,
1226 protocol_extension,
1227 turn_context,
1228 turn_causes: initial_turn_causes,
1229 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1230 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1231 turn_phase_probe: self.turn_phase_probe.clone(),
1232 };
1233 let protocol_run_offset = 0;
1234 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1235 let run_result = drive_turn_to_completion(
1236 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1237 &mut event_rx,
1238 &mut assembler,
1239 &child_usage_event_relay,
1240 events,
1241 turn_events,
1242 )
1243 .await;
1244 let (new_messages, _new_protocol_iteration) = match run_result {
1245 Ok(result) => result,
1246 Err(err) => {
1247 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1248 let RuntimeTurnDriver { session, .. } = driver;
1249 self.session = Some(session);
1250 return Err(err);
1251 }
1252 };
1253 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1254 tracing::debug!(
1255 new_message_count = new_messages.len(),
1256 tool_call_count = assembler.tool_calls.len(),
1257 "runtime post-run_task"
1258 );
1259
1260 let RuntimeTurnDriver {
1261 session,
1262 policy,
1263 turn_pipeline,
1264 pending_queue_claims,
1265 ..
1266 } = driver;
1267 self.session = Some(session);
1268 self.finish_turn(
1269 TurnFinishInput {
1270 turn_pipeline,
1271 assembler,
1272 new_messages,
1273 policy,
1274 turn_index,
1275 queued_work_completions: pending_queue_claims
1276 .iter()
1277 .map(crate::QueuedWorkClaim::completion)
1278 .collect(),
1279 trace_turn_id,
1280 },
1281 events,
1282 &finish_scoped_effect_controller,
1283 &cancel_state,
1284 )
1285 .await
1286 }
1287 async fn normalize_input_items(
1288 &self,
1289 items: &[InputItem],
1290 image_blobs: &HashMap<String, Vec<u8>>,
1291 ) -> Result<Vec<NormalizedItem>, String> {
1292 normalize_input_items(
1293 items,
1294 image_blobs,
1295 self.host.core.durability.attachment_store.as_ref(),
1296 )
1297 .await
1298 }
1299}
1300
1301fn turn_input_from_text(text: String) -> TurnInput {
1302 TurnInput {
1303 items: vec![InputItem::Text { text }],
1304 image_blobs: HashMap::new(),
1305 protocol_turn_options: None,
1306 trace_turn_id: None,
1307 protocol_extension: None,
1308 turn_context: crate::TurnContext::default(),
1309 }
1310}
1311
1312fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1313 if completed_turn_count == 0 {
1314 root_turn_id.to_string()
1315 } else {
1316 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1317 }
1318}
1319
1320pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1321 if input.protocol_extension.is_some() {
1322 return Err(RuntimeError::new(
1323 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1324 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1325 ));
1326 }
1327 input
1328 .turn_context
1329 .live_plugin_inputs()
1330 .durable_effect_rejection()?;
1331 Ok(())
1332}
1333
1334async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1335 if !events.is_noop() {
1336 events.emit(activity).await;
1337 }
1338}
1339
1340async fn drive_turn_to_completion<F>(
1350 run_future: F,
1351 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1352 assembler: &mut TurnAssembler,
1353 child_usage_event_relay: &ChildUsageEventRelay,
1354 events: &dyn EventSink,
1355 turn_events: &dyn TurnActivitySink,
1356) -> Result<(crate::MessageSequence, usize), RuntimeError>
1357where
1358 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1359{
1360 let run_result = {
1361 let mut run_future = Box::pin(run_future);
1362 loop {
1363 tokio::select! {
1364 maybe_event = event_rx.recv() => {
1365 if let Some(event) = maybe_event {
1366 emit_runtime_stream_event_to_sinks(
1367 events,
1368 turn_events,
1369 event,
1370 assembler,
1371 )
1372 .await;
1373 }
1374 }
1375 completed = run_future.as_mut() => {
1376 child_usage_event_relay.clear();
1377 break completed;
1378 }
1379 }
1380 }
1381 };
1382 while let Some(event) = event_rx.recv().await {
1383 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1384 }
1385 run_result
1386}
1387
1388async fn emit_runtime_stream_event_to_sinks(
1389 events: &dyn EventSink,
1390 turn_events: &dyn TurnActivitySink,
1391 event: RuntimeStreamEvent,
1392 assembler: &mut TurnAssembler,
1393) {
1394 match event {
1395 RuntimeStreamEvent::Session(event) => {
1396 assembler.push(&event);
1397 emit_session_event_to_sink(events, event).await;
1398 }
1399 RuntimeStreamEvent::Turn(activity) => {
1400 assembler.push_turn_activity(&activity);
1401 emit_turn_activity_to_sink(turn_events, activity).await;
1402 }
1403 }
1404}
1405
1406#[cfg(test)]
1407mod tests {
1408 use super::agent_frame_follow_turn_id;
1409
1410 #[test]
1411 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1412 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1413 assert_eq!(
1414 agent_frame_follow_turn_id("root-turn", 1),
1415 "root-turn:agent-frame:1"
1416 );
1417 assert_eq!(
1418 agent_frame_follow_turn_id("root-turn", 2),
1419 "root-turn:agent-frame:2"
1420 );
1421 }
1422}