1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::FinalValue { .. }) => ("completed", "final_value", None),
15 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
16 TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
17 "completed",
18 "agent_frame_switch",
19 Some(lash_trace::TraceAgentFrameSwitch {
20 frame_id: frame_id.clone(),
21 }),
22 ),
23 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
24 }
25}
26
27fn trace_stop_reason(stop: &TurnStop) -> &'static str {
28 match stop {
29 TurnStop::Cancelled => "cancelled",
30 TurnStop::Incomplete => "incomplete",
31 TurnStop::InvalidInput => "invalid_input",
32 TurnStop::MaxTurns => "max_turns",
33 TurnStop::ToolFailure => "tool_failure",
34 TurnStop::ProviderError => "provider_error",
35 TurnStop::PluginAbort => "plugin_abort",
36 TurnStop::RuntimeError => "runtime_error",
37 TurnStop::SubmittedError { .. } => "submitted_error",
38 TurnStop::ToolError { .. } => "tool_error",
39 }
40}
41
42fn session_head_refresh_error(err: SessionError) -> RuntimeError {
43 RuntimeError::new(
44 RuntimeErrorCode::Other("session_head_refresh".to_string()),
45 err.to_string(),
46 )
47}
48
49#[derive(Clone, Copy)]
50enum SessionExecutionLeaseReleasePolicy {
51 FinalCommit,
52 KeepOnAgentFrameSwitch,
53}
54
55impl SessionExecutionLeaseReleasePolicy {
56 fn should_release(self, outcome: &TurnOutcome) -> bool {
57 match self {
58 Self::FinalCommit => true,
59 Self::KeepOnAgentFrameSwitch => {
60 !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
61 }
62 }
63 }
64}
65
66fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
67 match payload {
68 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
69 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
70 }
71}
72
73fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
74 claim
75 .batches
76 .iter()
77 .map(|batch| batch.batch_id.clone())
78 .collect()
79}
80
81#[derive(Clone, Copy)]
91struct TurnStopwatch {
92 started: std::time::Instant,
93 started_at_ms: u64,
94}
95
96impl TurnStopwatch {
97 fn start(clock: &dyn crate::Clock) -> Self {
98 Self {
99 started: clock.now(),
100 started_at_ms: clock.timestamp_ms(),
101 }
102 }
103
104 fn stamp(&self, turn: &mut AssembledTurn, clock: &dyn crate::Clock) {
105 turn.execution.started_at_ms = self.started_at_ms;
106 turn.execution.duration_ms = clock
107 .now()
108 .saturating_duration_since(self.started)
109 .as_millis() as u64;
110 }
111}
112
113fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
114 format!("{parent_turn_id}:{phase}")
115}
116
117fn scoped_child_turn_controller<'run>(
118 scoped_effect_controller: &'run ScopedEffectController<'_>,
119 session_id: &str,
120 turn_id: &str,
121) -> Result<ScopedEffectController<'run>, RuntimeError> {
122 ScopedEffectController::borrowed(
123 scoped_effect_controller.controller(),
124 ExecutionScope::turn(session_id, turn_id),
125 )
126}
127
128pub(in crate::runtime) fn queued_work_trace_payload(
129 boundary: crate::QueuedWorkClaimBoundary,
130 claim: &crate::QueuedWorkClaim,
131 causes: &[crate::TurnCause],
132) -> serde_json::Value {
133 serde_json::json!({
134 "boundary": boundary,
135 "claim_id": claim.claim_id,
136 "owner_id": claim.owner.owner_id,
137 "incarnation_id": claim.owner.incarnation_id,
138 "batch_ids": queued_work_batch_ids(claim),
139 "payload_types": claim.batches.iter()
140 .flat_map(|batch| batch.items.iter())
141 .map(|item| queued_work_payload_type(&item.payload))
142 .collect::<Vec<_>>(),
143 "causes": causes,
144 })
145}
146
147pub(in crate::runtime) fn queued_work_completion_trace_payload(
148 completions: &[crate::QueuedWorkCompletion],
149) -> serde_json::Value {
150 serde_json::json!({
151 "claims": completions.iter().map(|completion| {
152 serde_json::json!({
153 "session_id": completion.session_id,
154 "claim_id": completion.claim_id,
155 "batch_ids": completion.batch_ids,
156 })
157 }).collect::<Vec<_>>(),
158 })
159}
160
161async fn emit_queued_work_started_to_sink(
162 events: &dyn TurnActivitySink,
163 boundary: crate::QueuedWorkClaimBoundary,
164 claim: &crate::QueuedWorkClaim,
165 causes: Vec<crate::TurnCause>,
166) {
167 emit_turn_activity_to_sink(
168 events,
169 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
170 boundary,
171 batch_ids: queued_work_batch_ids(claim),
172 causes,
173 }),
174 )
175 .await;
176}
177
178pub(in crate::runtime) async fn send_queued_work_started_event(
179 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
180 boundary: crate::QueuedWorkClaimBoundary,
181 claim: &crate::QueuedWorkClaim,
182 causes: Vec<crate::TurnCause>,
183) {
184 send_turn_activity(
185 event_tx,
186 TurnActivityId::fresh(),
187 TurnEvent::QueuedWorkStarted {
188 boundary,
189 batch_ids: queued_work_batch_ids(claim),
190 causes,
191 },
192 )
193 .await;
194}
195
196struct TurnFinishInput {
197 turn_pipeline: TurnBoundary,
198 assembler: TurnAssembler,
199 new_messages: crate::MessageSequence,
200 policy: RuntimeSessionPolicy,
201 turn_index: usize,
202 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
203 turn_input_completions: Vec<crate::TurnInputCompletion>,
204 trace_turn_id: String,
205}
206
207impl LashRuntime {
208 fn max_context_tokens(&self) -> usize {
209 self.state.effective_policy().context_window_tokens()
210 }
211
212 async fn claim_session_execution_lease(
213 &self,
214 cancel: CancellationToken,
215 busy_is_error: bool,
216 ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
217 let Some(store) = self
218 .session
219 .as_ref()
220 .and_then(|session| session.history_store())
221 else {
222 return Ok(None);
223 };
224 match SessionExecutionLeaseGuard::try_acquire(
225 store,
226 &self.state.session_id,
227 &self.runtime_lease_owner,
228 self.host.core.control.lease_timings,
229 Arc::clone(&self.host.core.clock),
230 cancel,
231 )
232 .await
233 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
234 {
235 Some(lease) => Ok(Some(lease)),
236 None if busy_is_error => Err(RuntimeError::new(
237 RuntimeErrorCode::SessionExecutionBusy,
238 format!(
239 "session `{}` is already executing on another runtime owner",
240 self.state.session_id
241 ),
242 )),
243 None => Ok(None),
244 }
245 }
246
247 async fn settle_session_execution_lease<T>(
248 &self,
249 guard: Option<&SessionExecutionLeaseGuard>,
250 result: Result<T, RuntimeError>,
251 ) -> Result<T, RuntimeError> {
252 match result {
253 Ok(value) => {
254 if let Some(guard) = guard {
255 guard.release_if_live().await.map_err(|err| {
256 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
257 })?;
258 }
259 Ok(value)
260 }
261 Err(err) => {
262 if err.code != RuntimeErrorCode::StoreCommitFailed
263 && let Some(guard) = guard
264 && let Err(release_err) = guard.release_if_live().await
265 {
266 tracing::warn!(
267 error = %release_err,
268 "failed to release session execution lease after runtime error"
269 );
270 }
271 Err(err)
272 }
273 }
274 }
275
276 async fn ensure_session_execution_lease_live(
277 &self,
278 guard: Option<&SessionExecutionLeaseGuard>,
279 ) -> Result<(), RuntimeError> {
280 let Some(guard) = guard else {
281 return Ok(());
282 };
283 guard.refresh_or_mark_lost().await.map_err(|err| {
284 RuntimeError::new(
285 RuntimeErrorCode::SessionExecutionLeaseLost,
286 format!(
287 "session execution lease for session `{}` was lost before commit: {err}",
288 self.state.session_id
289 ),
290 )
291 })
292 }
293
294 async fn abandon_queued_work_claims_after_lease_loss(
295 &self,
296 err: &RuntimeError,
297 claims: &[crate::QueuedWorkClaim],
298 ) {
299 if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
300 return;
301 }
302 let Some(store) = self
303 .session
304 .as_ref()
305 .and_then(|session| session.history_store())
306 else {
307 return;
308 };
309 for claim in claims {
310 if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
311 tracing::warn!(
312 error = %abandon_err,
313 session_id = %claim.session_id,
314 claim_id = %claim.claim_id,
315 "failed to abandon queued work claim after session execution lease loss"
316 );
317 }
318 }
319 }
320
321 async fn abandon_turn_input_claims_after_lease_loss(
322 &self,
323 err: &RuntimeError,
324 claims: &[crate::TurnInputClaim],
325 ) {
326 if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
327 return;
328 }
329 let Some(store) = self
330 .session
331 .as_ref()
332 .and_then(|session| session.history_store())
333 else {
334 return;
335 };
336 for claim in claims {
337 if let Err(abandon_err) = store.abandon_turn_input_claim(claim).await {
338 tracing::warn!(
339 error = %abandon_err,
340 session_id = %claim.session_id,
341 claim_id = %claim.claim_id,
342 "failed to abandon turn input claim after session execution lease loss"
343 );
344 }
345 }
346 }
347
348 #[doc(hidden)]
349 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
350 self.turn_phase_probe = Some(probe);
351 }
352
353 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
354 if let Some(probe) = self.turn_phase_probe.as_ref() {
355 probe.begin(phase);
356 }
357 }
358
359 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
360 if let Some(probe) = self.turn_phase_probe.as_ref() {
361 probe.end(phase);
362 }
363 }
364
365 async fn finish_turn(
366 &mut self,
367 finish: TurnFinishInput,
368 events: &dyn EventSink,
369 scoped_effect_controller: &ScopedEffectController<'_>,
370 cancel_state: &CancellationToken,
371 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
372 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
373 ) -> Result<AssembledTurn, RuntimeError> {
374 let TurnFinishInput {
375 mut turn_pipeline,
376 assembler,
377 new_messages,
378 policy,
379 turn_index,
380 queued_work_completions,
381 turn_input_completions,
382 trace_turn_id,
383 } = finish;
384 self.policy = self.state.effective_policy().clone();
385 turn_pipeline.state_mut().policy = self.policy.clone();
386 turn_pipeline.state_mut().turn_index = turn_index;
387
388 let mut turn_usage_delta = {
389 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
390 std::mem::take(&mut *ledger)
391 };
392 if assembler.token_usage.total() > 0 {
393 turn_usage_delta.push(TokenLedgerEntry {
394 source: "turn".to_string(),
395 model: policy.model.id.clone(),
396 usage: assembler.token_usage.clone(),
397 });
398 }
399 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
400
401 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
402 if assembler.token_usage.total() > 0 {
403 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
404 }
405
406 let last_prompt_usage = assembler.last_llm_usage().and_then(normalize_prompt_usage);
407 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
408 let assembled_state = turn_pipeline.export_state_for_assembly();
409 let assembled = assembler.finish(
410 assembled_state,
411 cancel_state.is_cancelled(),
412 None,
413 &self.host.core.control.termination,
414 );
415
416 let Some(session) = self.session.as_ref() else {
417 self.state.apply_snapshot(&assembled.state);
418 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
419 return Ok(assembled);
420 };
421 self.ensure_session_execution_lease_live(session_execution_lease)
422 .await?;
423
424 let plugins = Arc::clone(session.plugins());
425 let manager = match self.runtime_session_services_for_turn(None) {
426 Ok(manager) => manager,
427 Err(err) => {
428 return Err(RuntimeError::new(
429 RuntimeErrorCode::PluginSessionManager,
430 err.to_string(),
431 ));
432 }
433 };
434
435 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
436 let finalized = match plugins
437 .finalize_turn_with_phase_probe(
438 assembled,
439 manager.state_service(),
440 manager.lifecycle_service(),
441 manager.graph_service(),
442 self.turn_phase_probe.clone(),
443 )
444 .await
445 {
446 Ok(finalized) => finalized,
447 Err(err) => {
448 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
449 return Err(RuntimeError::new(
450 RuntimeErrorCode::PluginFinalizeTurn,
451 err.to_string(),
452 ));
453 }
454 };
455 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
456 self.ensure_session_execution_lease_live(session_execution_lease)
457 .await?;
458
459 let mut returned_turn = finalized.turn;
460 let release_session_execution_lease =
461 session_execution_lease_release_policy.should_release(&returned_turn.outcome);
462 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
463 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
464 let queued_work_completion_trace = queued_work_completions.clone();
465 let pending_attachment_ids = self
466 .host
467 .core
468 .durability
469 .attachment_store
470 .pending_manifest_commit_ids();
471 if let Err(err) = turn_pipeline
472 .final_commit(
473 &mut returned_turn,
474 self.session.as_mut(),
475 &turn_usage_delta,
476 Some(&trace_turn_id),
477 queued_work_completions,
478 turn_input_completions,
479 cancel_state.is_cancelled().then(|| trace_turn_id.clone()),
480 pending_attachment_ids.clone(),
481 release_session_execution_lease
482 .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
483 .flatten(),
484 )
485 .await
486 {
487 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
488 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
489 return Err(err);
490 }
491 if release_session_execution_lease && let Some(lease) = session_execution_lease {
492 lease.mark_released();
493 }
494 self.host
495 .core
496 .durability
497 .attachment_store
498 .mark_manifest_committed(&pending_attachment_ids);
499 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
500
501 emit_session_events_to_sink(events, finalized.events).await;
502 self.state = turn_pipeline.into_final_state();
503 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
504 && let Some(session) = self.session.as_mut()
505 {
506 let protocol_session = Arc::clone(session.plugins().protocol_session());
507 let session_id = self.state.session_id.clone();
508 protocol_session
509 .restore_session(
510 crate::plugin::ProtocolSessionContext::new(session, &session_id),
511 &self.state,
512 )
513 .await
514 .map_err(|err| {
515 RuntimeError::new(
516 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
517 err.to_string(),
518 )
519 })?;
520 }
521 if !queued_work_completion_trace.is_empty() {
522 crate::trace::emit_trace(
523 &self.host.core.tracing.trace_sink,
524 &self.host.core.tracing.trace_context,
525 lash_trace::TraceContext::default()
526 .for_session(returned_turn.state.session_id.clone())
527 .for_turn_index(returned_turn.state.turn_index)
528 .for_turn(trace_turn_id.clone()),
529 lash_trace::TraceEvent::Custom {
530 name: "queued_work.completed".to_string(),
531 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
532 },
533 self.host.core.clock.as_ref(),
534 );
535 }
536 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
537 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
538 .await?;
539 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
540 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
541
542 self.emit_completed_turn_trace(
543 &returned_turn.state,
544 &returned_turn.outcome,
545 &trace_turn_id,
546 );
547 Ok(returned_turn)
548 }
549
550 fn emit_completed_turn_trace(
551 &self,
552 state: &SessionSnapshot,
553 outcome: &TurnOutcome,
554 trace_turn_id: &str,
555 ) {
556 if self.host.core.tracing.trace_sink.is_none() {
557 return;
558 }
559
560 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
561 crate::trace::emit_trace(
562 &self.host.core.tracing.trace_sink,
563 &self.host.core.tracing.trace_context,
564 lash_trace::TraceContext::default()
565 .for_session(state.session_id.clone())
566 .for_turn_index(state.turn_index)
567 .for_turn(trace_turn_id.to_string()),
568 lash_trace::TraceEvent::TurnCompleted {
569 status: status.to_string(),
570 done_reason: done_reason.to_string(),
571 agent_frame_switch,
572 },
573 self.host.core.clock.as_ref(),
574 );
575 }
576
577 async fn emit_turn_persisted_event(
578 &self,
579 returned_turn: &AssembledTurn,
580 scoped_effect_controller: &ScopedEffectController<'_>,
581 trace_turn_id: &str,
582 ) -> Result<(), RuntimeError> {
583 let Some(session) = self.session.as_ref() else {
584 return Ok(());
585 };
586 let Ok(manager) = self.runtime_session_services() else {
587 return Ok(());
588 };
589 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
590 let phase_controller = scoped_child_turn_controller(
591 scoped_effect_controller,
592 &self.state.session_id,
593 &phase_turn_id,
594 )?;
595 let direct_completions = manager.direct_completion_client(
596 RuntimeEffectControllerHandle::borrowed(phase_controller),
597 Some(phase_turn_id),
598 );
599
600 session
601 .plugins()
602 .emit_runtime_event_with_phase_probe(
603 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
604 crate::SessionStateChangedContext {
605 session_id: self.state.session_id.clone(),
606 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
607 sessions: manager.state_service(),
608 session_graph: manager.graph_service(),
609 direct_completions,
610 },
611 )),
612 self.turn_phase_probe.clone(),
613 )
614 .await;
615 Ok(())
616 }
617
618 pub async fn stream_turn(
620 &mut self,
621 input: TurnInput,
622 opts: TurnOptions<'_>,
623 ) -> Result<AssembledTurn, RuntimeError> {
624 let stopwatch = TurnStopwatch::start(self.host.core.clock.as_ref());
625 let cancel = opts.cancel.clone();
626 let session_execution_lease = self
627 .claim_session_execution_lease(cancel.clone(), true)
628 .await?;
629 let scoped_effect_controller = opts.scoped_effect_controller();
630 let result = self
631 .stream_turn_with_scoped_effect_controller_inner(
632 input,
633 opts.events_or_noop(),
634 opts.turn_events_or_noop(),
635 scoped_effect_controller,
636 cancel,
637 None,
638 None,
639 session_execution_lease.as_ref(),
640 SessionExecutionLeaseReleasePolicy::FinalCommit,
641 )
642 .await
643 .map(|mut turn| {
644 stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
645 turn
646 });
647 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
648 .await
649 }
650
651 pub async fn stream_next_queued_work(
652 &mut self,
653 opts: TurnOptions<'_>,
654 ) -> Result<Option<AssembledTurn>, RuntimeError> {
655 self.stream_queued_work(opts, None).await
656 }
657
658 pub async fn stream_selected_queued_work(
659 &mut self,
660 opts: TurnOptions<'_>,
661 batch_ids: &[String],
662 ) -> Result<Option<AssembledTurn>, RuntimeError> {
663 self.stream_queued_work(opts, Some(batch_ids)).await
664 }
665
666 async fn stream_queued_work(
667 &mut self,
668 opts: TurnOptions<'_>,
669 selected_batch_ids: Option<&[String]>,
670 ) -> Result<Option<AssembledTurn>, RuntimeError> {
671 let stopwatch = TurnStopwatch::start(self.host.core.clock.as_ref());
672 let cancel = opts.cancel.clone();
673 let Some(session_execution_lease) = self
674 .claim_session_execution_lease(cancel.clone(), false)
675 .await?
676 else {
677 return Ok(None);
678 };
679 let session_execution_fence = session_execution_lease.fence();
680 let Some(store) = self
681 .session
682 .as_ref()
683 .and_then(|session| session.history_store())
684 else {
685 session_execution_lease
686 .release_if_live()
687 .await
688 .map_err(|err| {
689 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
690 })?;
691 return Ok(None);
692 };
693 let drain_commands_before_turn_input = if selected_batch_ids.is_some() {
694 true
695 } else {
696 self.session_commands_precede_pending_turn_input(store.as_ref())
697 .await?
698 };
699 if drain_commands_before_turn_input {
700 loop {
701 match self
702 .drain_next_session_command(&session_execution_fence)
703 .await
704 {
705 Ok(Some(_)) => {}
706 Ok(None) => break,
707 Err(err) => {
708 let _ = session_execution_lease.release_if_live().await;
709 return Err(err);
710 }
711 }
712 }
713 }
714 if selected_batch_ids.is_none() {
715 let input_claim = store
716 .claim_next_turn_inputs(
717 &self.state.session_id,
718 &session_execution_fence,
719 &self.runtime_lease_owner,
720 self.host.core.control.lease_timings.ttl_ms(),
721 64,
722 )
723 .await
724 .map_err(super::runtime_error_from_store_commit)?;
725 if let Some(input_claim) = input_claim {
726 let mut input = input_claim.materialize_for_turn();
727 let turn_id = input
728 .trace_turn_id
729 .clone()
730 .or_else(|| Some(opts.execution_scope_id().to_owned()))
731 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
732 input.trace_turn_id = Some(turn_id.clone());
733 crate::trace::emit_trace(
734 &self.host.core.tracing.trace_sink,
735 &self.host.core.tracing.trace_context,
736 lash_trace::TraceContext::default()
737 .for_session(self.state.session_id.clone())
738 .for_turn_index(self.state.turn_index + 1)
739 .for_turn(turn_id.clone()),
740 lash_trace::TraceEvent::Custom {
741 name: "turn_input.claimed".to_string(),
742 payload: serde_json::json!({
743 "claim_id": &input_claim.claim_id,
744 "input_ids": input_claim.inputs.iter().map(|input| input.input_id.clone()).collect::<Vec<_>>(),
745 }),
746 },
747 self.host.core.clock.as_ref(),
748 );
749 let claim_for_abandon = input_claim.clone();
750 let scoped_effect_controller = opts.scoped_effect_controller();
751 let result = self
752 .stream_turn_with_scoped_effect_controller_inner(
753 input,
754 opts.events_or_noop(),
755 opts.turn_events_or_noop(),
756 scoped_effect_controller,
757 cancel,
758 None,
759 Some(input_claim),
760 Some(&session_execution_lease),
761 SessionExecutionLeaseReleasePolicy::FinalCommit,
762 )
763 .await
764 .map(|mut turn| {
765 stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
766 Some(turn)
767 });
768 if let Err(err) = &result {
769 self.abandon_turn_input_claims_after_lease_loss(
770 err,
771 std::slice::from_ref(&claim_for_abandon),
772 )
773 .await;
774 }
775 return self
776 .settle_session_execution_lease(Some(&session_execution_lease), result)
777 .await;
778 }
779 }
780 let claim = if let Some(batch_ids) = selected_batch_ids {
781 store
782 .claim_ready_queued_work_by_batch_ids(
783 &self.state.session_id,
784 &session_execution_fence,
785 &self.runtime_lease_owner,
786 crate::QueuedWorkClaimBoundary::Idle,
787 self.host.core.control.lease_timings.ttl_ms(),
788 batch_ids,
789 )
790 .await
791 } else {
792 store
793 .claim_ready_queued_work(
794 &self.state.session_id,
795 &session_execution_fence,
796 &self.runtime_lease_owner,
797 crate::QueuedWorkClaimBoundary::Idle,
798 self.host.core.control.lease_timings.ttl_ms(),
799 64,
800 )
801 .await
802 }
803 .map_err(super::runtime_error_from_store_commit)?;
804 let Some(claim) = claim else {
805 session_execution_lease
806 .release_if_live()
807 .await
808 .map_err(|err| {
809 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
810 })?;
811 return Ok(None);
812 };
813 let mut work = claim.materialize_for_turn();
814 let turn_id = work
815 .input
816 .trace_turn_id
817 .clone()
818 .or_else(|| Some(opts.execution_scope_id().to_owned()))
819 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
820 work.input.trace_turn_id = Some(turn_id.clone());
821 let causes = work.turn_causes.clone();
822 emit_queued_work_started_to_sink(
823 opts.turn_events_or_noop(),
824 crate::QueuedWorkClaimBoundary::Idle,
825 &claim,
826 causes.clone(),
827 )
828 .await;
829 crate::trace::emit_trace(
830 &self.host.core.tracing.trace_sink,
831 &self.host.core.tracing.trace_context,
832 lash_trace::TraceContext::default()
833 .for_session(self.state.session_id.clone())
834 .for_turn_index(self.state.turn_index + 1)
835 .for_turn(turn_id.clone()),
836 lash_trace::TraceEvent::Custom {
837 name: "queued_work.claimed".to_string(),
838 payload: queued_work_trace_payload(
839 crate::QueuedWorkClaimBoundary::Idle,
840 &claim,
841 &causes,
842 ),
843 },
844 self.host.core.clock.as_ref(),
845 );
846 let claim_for_abandon = claim.clone();
847 let scoped_effect_controller = opts.scoped_effect_controller();
848 let result = self
849 .stream_turn_with_scoped_effect_controller_inner(
850 work.input,
851 opts.events_or_noop(),
852 opts.turn_events_or_noop(),
853 scoped_effect_controller,
854 cancel,
855 Some(claim),
856 None,
857 Some(&session_execution_lease),
858 SessionExecutionLeaseReleasePolicy::FinalCommit,
859 )
860 .await
861 .map(|mut turn| {
862 stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
863 Some(turn)
864 });
865 if let Err(err) = &result {
866 self.abandon_queued_work_claims_after_lease_loss(
867 err,
868 std::slice::from_ref(&claim_for_abandon),
869 )
870 .await;
871 }
872 self.settle_session_execution_lease(Some(&session_execution_lease), result)
873 .await
874 }
875
876 async fn session_commands_precede_pending_turn_input(
877 &self,
878 store: &dyn crate::RuntimePersistence,
879 ) -> Result<bool, RuntimeError> {
880 let pending_inputs = store
881 .list_pending_turn_inputs(&self.state.session_id)
882 .await
883 .map_err(super::runtime_error_from_store_commit)?;
884 let earliest_input = pending_inputs
885 .iter()
886 .filter(|input| input.state.is_next_turn_pending())
887 .min_by_key(|input| (input.enqueued_at_ms, input.enqueue_seq));
888 let queued_work = store
889 .list_pending_queued_work(&self.state.session_id)
890 .await
891 .map_err(super::runtime_error_from_store_commit)?;
892 let earliest_command = queued_work
893 .iter()
894 .filter(|batch| batch.is_session_command_work())
895 .min_by_key(|batch| (batch.enqueued_at_ms, batch.enqueue_seq));
896 Ok(match (earliest_command, earliest_input) {
897 (Some(command), Some(input)) => command.enqueued_at_ms < input.enqueued_at_ms,
898 (Some(_), None) => true,
899 _ => false,
900 })
901 }
902
903 fn ensure_durable_store_facets_for_scope(
911 &self,
912 scoped_effect_controller: &ScopedEffectController<'_>,
913 ) -> Result<(), RuntimeError> {
914 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
915 {
916 return Ok(());
917 }
918 if self
919 .host
920 .core
921 .durability
922 .attachment_store
923 .persistence()
924 .durability_tier()
925 != crate::DurabilityTier::Durable
926 {
927 return Err(RuntimeError::durable_store_required(
928 crate::DurableStoreFacet::AttachmentStore,
929 ));
930 }
931 if self
932 .host
933 .core
934 .durability
935 .process_env_store
936 .durability_tier()
937 != crate::DurabilityTier::Durable
938 {
939 return Err(RuntimeError::durable_store_required(
940 crate::DurableStoreFacet::ProcessEnvStore,
941 ));
942 }
943 if let Some(store) = self
944 .session
945 .as_ref()
946 .and_then(|session| session.history_store())
947 && store.durability_tier() != crate::DurabilityTier::Durable
948 {
949 return Err(RuntimeError::durable_store_required(
950 crate::DurableStoreFacet::SessionStore,
951 ));
952 }
953 if let Some(process_registry) = self.host.process_registry.as_ref()
954 && process_registry.durability_tier() != crate::DurabilityTier::Durable
955 {
956 return Err(RuntimeError::durable_store_required(
957 crate::DurableStoreFacet::ProcessRegistry,
958 ));
959 }
960 Ok(())
961 }
962
963 #[allow(clippy::too_many_arguments)]
964 async fn stream_turn_with_scoped_effect_controller_inner(
965 &mut self,
966 mut input: TurnInput,
967 events: &dyn EventSink,
968 turn_events: &dyn TurnActivitySink,
969 scoped_effect_controller: ScopedEffectController<'_>,
970 cancel: CancellationToken,
971 queued_claim: Option<crate::QueuedWorkClaim>,
972 turn_input_claim: Option<crate::TurnInputClaim>,
973 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
974 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
975 ) -> Result<AssembledTurn, RuntimeError> {
976 if queued_claim.is_none() && turn_input_claim.is_none() {
977 if let Some(lease) = session_execution_lease {
978 while self
979 .drain_next_session_command(&lease.fence())
980 .await?
981 .is_some()
982 {}
983 } else if self
984 .session
985 .as_ref()
986 .and_then(|session| session.history_store())
987 .is_some()
988 {
989 return Err(RuntimeError::new(
990 RuntimeErrorCode::StoreCommitFailed,
991 "session command drain requires a session execution lease",
992 ));
993 }
994 }
995 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
996 && scoped_effect_controller
997 .execution_scope()
998 .validates_turn_trace_id()
999 && input_turn_id != scoped_effect_controller.scope_id()
1000 {
1001 return Err(RuntimeError::new(
1002 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
1003 format!(
1004 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
1005 scoped_effect_controller.scope_id()
1006 ),
1007 ));
1008 }
1009 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
1010 input
1011 .trace_turn_id
1012 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
1013 self.stream_turn_inner(
1014 input.clone(),
1015 events,
1016 turn_events,
1017 scoped_effect_controller,
1018 cancel.clone(),
1019 queued_claim,
1020 turn_input_claim,
1021 session_execution_lease,
1022 session_execution_lease_release_policy,
1023 )
1024 .await
1025 }
1026
1027 pub async fn stream_turn_with_agent_frames(
1036 &mut self,
1037 input: TurnInput,
1038 opts: TurnOptions<'_>,
1039 ) -> Result<AgentFrameRun, RuntimeError> {
1040 let stopwatch = TurnStopwatch::start(self.host.core.clock.as_ref());
1041 let cancel = opts.cancel.clone();
1042 let session_execution_lease = self
1043 .claim_session_execution_lease(cancel.clone(), true)
1044 .await?;
1045 let scoped_effect_controller = opts.scoped_effect_controller();
1046 let result = self
1047 .stream_turn_with_agent_frames_inner(
1048 input,
1049 opts.events_or_noop(),
1050 opts.turn_events_or_noop(),
1051 scoped_effect_controller,
1052 cancel,
1053 session_execution_lease.as_ref(),
1054 stopwatch,
1055 )
1056 .await;
1057 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1058 .await
1059 }
1060
1061 #[allow(clippy::too_many_arguments)]
1062 async fn stream_turn_with_agent_frames_inner(
1063 &mut self,
1064 mut input: TurnInput,
1065 events: &dyn EventSink,
1066 turn_events: &dyn TurnActivitySink,
1067 scoped_effect_controller: ScopedEffectController<'_>,
1068 cancel: CancellationToken,
1069 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1070 stopwatch: TurnStopwatch,
1071 ) -> Result<AgentFrameRun, RuntimeError> {
1072 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
1073 && scoped_effect_controller
1074 .execution_scope()
1075 .validates_turn_trace_id()
1076 && input_turn_id != scoped_effect_controller.scope_id()
1077 {
1078 return Err(RuntimeError::new(
1079 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
1080 format!(
1081 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
1082 scoped_effect_controller.scope_id()
1083 ),
1084 ));
1085 }
1086 let follow_protocol_turn_options = input.protocol_turn_options.clone();
1087 let follow_turn_context = input.turn_context.clone();
1088 let follow_trace_turn_id = input
1089 .trace_turn_id
1090 .clone()
1091 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
1092 input
1093 .trace_turn_id
1094 .get_or_insert(follow_trace_turn_id.clone());
1095 let mut turns = Vec::new();
1096 loop {
1097 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
1098 input.trace_turn_id = Some(turn_trace_turn_id.clone());
1099 let turn_effect_controller = if turns.is_empty() {
1100 scoped_effect_controller.clone()
1101 } else {
1102 ScopedEffectController::borrowed(
1103 scoped_effect_controller.controller(),
1104 ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
1105 )?
1106 };
1107 let frame_stopwatch = if turns.is_empty() {
1111 stopwatch
1112 } else {
1113 TurnStopwatch::start(self.host.core.clock.as_ref())
1114 };
1115 let mut turn = self
1116 .stream_turn_with_scoped_effect_controller_inner(
1117 input,
1118 events,
1119 turn_events,
1120 turn_effect_controller,
1121 cancel.clone(),
1122 None,
1123 None,
1124 session_execution_lease,
1125 SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
1126 )
1127 .await?;
1128 frame_stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
1129 let switched_frame = match &turn.outcome {
1130 TurnOutcome::AgentFrameSwitch { frame_id, task } => {
1131 Some((frame_id.clone(), task.clone()))
1132 }
1133 _ => None,
1134 };
1135 turns.push(turn);
1136
1137 let Some((_frame_id, task)) = switched_frame else {
1138 return Ok(AgentFrameRun { turns });
1139 };
1140 input = turn_input_from_text(task);
1141 input.protocol_turn_options = follow_protocol_turn_options.clone();
1142 input.turn_context = follow_turn_context.clone();
1143 }
1144 }
1145
1146 #[allow(clippy::too_many_arguments)]
1147 async fn stream_turn_inner(
1148 &mut self,
1149 mut input: TurnInput,
1150 events: &dyn EventSink,
1151 turn_events: &dyn TurnActivitySink,
1152 scoped_effect_controller: ScopedEffectController<'_>,
1153 cancel: CancellationToken,
1154 queued_claim: Option<crate::QueuedWorkClaim>,
1155 turn_input_claim: Option<crate::TurnInputClaim>,
1156 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1157 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1158 ) -> Result<AssembledTurn, RuntimeError> {
1159 self.refresh_session_graph_from_store()
1160 .await
1161 .map_err(session_head_refresh_error)?;
1162 let input_trace_turn_id = input.trace_turn_id.clone();
1163 let queued_turn_work = queued_claim
1164 .as_ref()
1165 .map(crate::QueuedWorkClaim::materialize_for_turn);
1166 let pending_turn_input = turn_input_claim
1167 .as_ref()
1168 .map(crate::TurnInputClaim::materialize_for_turn);
1169 if let Some(work) = pending_turn_input.as_ref()
1170 && input.items.is_empty()
1171 && input.image_blobs.is_empty()
1172 {
1173 input = work.clone();
1174 if input.trace_turn_id.is_none() {
1175 input.trace_turn_id = input_trace_turn_id.clone();
1176 }
1177 }
1178 if let Some(work) = queued_turn_work.as_ref()
1179 && input.items.is_empty()
1180 && input.image_blobs.is_empty()
1181 {
1182 input = work.input.clone();
1183 if input.trace_turn_id.is_none() {
1184 input.trace_turn_id = input_trace_turn_id;
1185 }
1186 }
1187 if self
1188 .session
1189 .as_ref()
1190 .and_then(|session| session.history_store())
1191 .is_some()
1192 {
1193 ensure_durable_effect_input(&input)?;
1194 }
1195 if let Some(extension) = &input.protocol_extension
1196 && let Some(session) = self.session.as_ref()
1197 {
1198 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
1199 protocol_session
1200 .validate_turn_extension(extension)
1201 .await
1202 .map_err(|err| {
1203 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
1204 })?;
1205 }
1206 let previous_prompt_usage = self.state.last_prompt_usage.clone();
1207 let normalized = match self
1208 .normalize_input_items(&input.items, &input.image_blobs)
1209 .await
1210 {
1211 Ok(items) => items,
1212 Err(e) => {
1213 self.state.last_prompt_usage = None;
1214 let mut assembler = TurnAssembler::default();
1215 let error_event = SessionEvent::Error {
1216 message: e.clone(),
1217 envelope: Some(crate::session_model::ErrorEnvelope {
1218 kind: "input_validation".to_string(),
1219 code: Some("invalid_turn_input".to_string()),
1220 terminal_reason: None,
1221 user_message: e.clone(),
1222 raw: None,
1223 retryable: Some(false),
1224 provider_failure_kind: None,
1225 }),
1226 };
1227 assembler.push(&error_event);
1228 emit_turn_activity_to_sink(
1229 turn_events,
1230 TurnActivity::independent(TurnEvent::Error { message: e }),
1231 )
1232 .await;
1233 emit_session_event_to_sink(events, error_event).await;
1234 let outcome_event = SessionEvent::TurnOutcome {
1235 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1236 };
1237 assembler.push(&outcome_event);
1238 emit_session_event_to_sink(events, outcome_event).await;
1239 assembler.push(&SessionEvent::Done);
1240 emit_session_event_to_sink(events, SessionEvent::Done).await;
1241 return Ok(assembler.finish(
1242 self.state.to_snapshot(),
1243 false,
1244 None,
1245 &self.host.core.control.termination,
1246 ));
1247 }
1248 };
1249 let turn_index = self.state.turn_index + 1;
1250 let trace_turn_id = input
1251 .trace_turn_id
1252 .clone()
1253 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1254 if self.host.core.tracing.trace_sink.is_some() {
1255 let mut trace_metadata = std::collections::BTreeMap::new();
1256 trace_metadata.insert(
1257 "input_item_count".to_string(),
1258 serde_json::json!(normalized.len()),
1259 );
1260 crate::trace::emit_trace(
1261 &self.host.core.tracing.trace_sink,
1262 &self.host.core.tracing.trace_context,
1263 lash_trace::TraceContext::default()
1264 .for_session(self.state.session_id.clone())
1265 .for_turn_index(turn_index)
1266 .for_turn(trace_turn_id.clone()),
1267 lash_trace::TraceEvent::TurnStarted {
1268 metadata: trace_metadata,
1269 },
1270 self.host.core.clock.as_ref(),
1271 );
1272 }
1273
1274 let base_read_model = self.state.read_model();
1275 let base_messages = base_read_model.messages;
1276 let base_render_cache = base_read_model.prompt_render_cache;
1277 let mut turn_delta = Vec::new();
1278 let initial_turn_causes = queued_turn_work
1279 .as_ref()
1280 .map(|work| work.turn_causes.clone())
1281 .unwrap_or_default();
1282 turn_delta.extend(
1283 initial_turn_causes
1284 .iter()
1285 .map(crate::TurnCause::to_event_message),
1286 );
1287
1288 let user_id = fresh_message_id();
1289 let mut user_parts: Vec<Part> = Vec::new();
1290 for item in normalized {
1291 match item {
1292 NormalizedItem::Text(text) => {
1293 if text.is_empty() {
1294 continue;
1295 }
1296 user_parts.push(Part {
1297 id: format!("{}.p{}", user_id, user_parts.len()),
1298 kind: PartKind::Text,
1299 content: text,
1300 attachment: None,
1301 tool_call_id: None,
1302 tool_name: None,
1303 tool_replay: None,
1304 prune_state: PruneState::Intact,
1305 reasoning_meta: None,
1306 response_meta: None,
1307 });
1308 }
1309 NormalizedItem::Image(reference) => {
1310 user_parts.push(Part {
1311 id: format!("{}.p{}", user_id, user_parts.len()),
1312 kind: PartKind::Image,
1313 content: String::new(),
1314 attachment: Some(crate::session_model::message::PartAttachment {
1315 reference,
1316 }),
1317 tool_call_id: None,
1318 tool_name: None,
1319 tool_replay: None,
1320 prune_state: PruneState::Intact,
1321 reasoning_meta: None,
1322 response_meta: None,
1323 });
1324 }
1325 }
1326 }
1327 if user_parts.is_empty() && initial_turn_causes.is_empty() {
1328 user_parts.push(Part {
1329 id: format!("{}.p0", user_id),
1330 kind: PartKind::Text,
1331 content: String::new(),
1332 attachment: None,
1333 tool_call_id: None,
1334 tool_name: None,
1335 tool_replay: None,
1336 prune_state: PruneState::Intact,
1337 reasoning_meta: None,
1338 response_meta: None,
1339 });
1340 }
1341 if !user_parts.is_empty() {
1342 reassign_part_ids(&user_id, &mut user_parts);
1343 turn_delta.push(Message {
1344 id: user_id.clone(),
1345 role: MessageRole::User,
1346 parts: shared_parts(user_parts),
1347 origin: None,
1348 });
1349 }
1350
1351 let manager = self
1352 .runtime_session_services_for_turn(None)
1353 .map_err(|err| {
1354 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1355 })?;
1356 let plugin_session = self
1357 .session
1358 .as_ref()
1359 .map(|s| Arc::clone(s.plugins()))
1360 .ok_or_else(|| {
1361 RuntimeError::new(
1362 RuntimeErrorCode::ContextPrepareTurn,
1363 "runtime session not available",
1364 )
1365 })?;
1366 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1367 let prepare_phase_controller = scoped_child_turn_controller(
1368 &scoped_effect_controller,
1369 &self.state.session_id,
1370 &prepare_phase_turn_id,
1371 )?;
1372 let turn_ctx = crate::TurnTransformContext {
1373 session_id: self.state.session_id.clone(),
1374 state: self.read_view(),
1375 prompt_usage: previous_prompt_usage.clone(),
1376 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1377 sessions: manager.state_service(),
1378 session_lifecycle: manager.lifecycle_service(),
1379 session_graph: manager.graph_service(),
1380 scoped_effect_controller: scoped_effect_controller.clone(),
1381 direct_completions: manager.direct_completion_client(
1382 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1383 Some(prepare_phase_turn_id),
1384 ),
1385 };
1386 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1387 let prepared_context = plugin_session
1388 .prepare_turn_context(
1389 &turn_ctx,
1390 crate::session_model::context::PreparedContext {
1391 messages: crate::MessageSequence::from_base_and_delta(
1392 base_messages,
1393 turn_delta,
1394 )
1395 .with_base_render_cache(base_render_cache),
1396 ..Default::default()
1397 },
1398 self.turn_phase_probe.clone(),
1399 )
1400 .await
1401 .map_err(|err| {
1402 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1403 })?;
1404 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1405 drop(turn_ctx);
1410 let messages = prepared_context.messages;
1411 if let Some(session) = self.session.as_mut() {
1412 session
1413 .set_context_overlay(
1414 prepared_context.tool_providers,
1415 prepared_context.prompt_contributions,
1416 prepared_context.include_base_tools,
1417 )
1418 .map_err(|err| {
1419 RuntimeError::new(
1420 RuntimeErrorCode::Other("session_tool_registry".to_string()),
1421 err.to_string(),
1422 )
1423 })?;
1424 }
1425
1426 self.state.last_prompt_usage = None;
1427 Box::pin(self.stream_prepared_turn_inner(
1428 messages,
1429 previous_prompt_usage,
1430 input.protocol_turn_options.clone(),
1431 input.protocol_extension.clone(),
1432 input.turn_context.clone(),
1433 initial_turn_causes,
1434 trace_turn_id,
1435 turn_index,
1436 events,
1437 turn_events,
1438 scoped_effect_controller,
1439 cancel,
1440 queued_claim,
1441 turn_input_claim,
1442 session_execution_lease,
1443 session_execution_lease_release_policy,
1444 ))
1445 .await
1446 }
1447
1448 pub async fn run_turn_assembled(
1450 &mut self,
1451 input: TurnInput,
1452 cancel: CancellationToken,
1453 scoped_effect_controller: ScopedEffectController<'_>,
1454 ) -> Result<AssembledTurn, RuntimeError> {
1455 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1456 .await
1457 }
1458
1459 #[allow(clippy::too_many_arguments)]
1461 pub async fn stream_prepared_turn(
1462 &mut self,
1463 messages: crate::MessageSequence,
1464 previous_prompt_usage: Option<PromptUsage>,
1465 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1466 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1467 turn_context: crate::TurnContext,
1468 initial_turn_causes: Vec<crate::TurnCause>,
1469 trace_turn_id: String,
1470 turn_index: usize,
1471 events: &dyn EventSink,
1472 turn_events: &dyn TurnActivitySink,
1473 scoped_effect_controller: ScopedEffectController<'_>,
1474 cancel: CancellationToken,
1475 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1476 initial_turn_input_claim: Option<crate::TurnInputClaim>,
1477 ) -> Result<AssembledTurn, RuntimeError> {
1478 let stopwatch = TurnStopwatch::start(self.host.core.clock.as_ref());
1479 let session_execution_lease = self
1480 .claim_session_execution_lease(cancel.clone(), true)
1481 .await?;
1482 let result = self
1483 .stream_prepared_turn_inner(
1484 messages,
1485 previous_prompt_usage,
1486 protocol_turn_options,
1487 protocol_extension,
1488 turn_context,
1489 initial_turn_causes,
1490 trace_turn_id,
1491 turn_index,
1492 events,
1493 turn_events,
1494 scoped_effect_controller,
1495 cancel,
1496 initial_queue_claim,
1497 initial_turn_input_claim,
1498 session_execution_lease.as_ref(),
1499 SessionExecutionLeaseReleasePolicy::FinalCommit,
1500 )
1501 .await
1502 .map(|mut turn| {
1503 stopwatch.stamp(&mut turn, self.host.core.clock.as_ref());
1504 turn
1505 });
1506 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1507 .await
1508 }
1509
1510 #[allow(clippy::too_many_arguments)]
1511 async fn stream_prepared_turn_inner(
1512 &mut self,
1513 messages: crate::MessageSequence,
1514 _previous_prompt_usage: Option<PromptUsage>,
1515 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1516 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1517 turn_context: crate::TurnContext,
1518 initial_turn_causes: Vec<crate::TurnCause>,
1519 trace_turn_id: String,
1520 turn_index: usize,
1521 events: &dyn EventSink,
1522 turn_events: &dyn TurnActivitySink,
1523 scoped_effect_controller: ScopedEffectController<'_>,
1524 cancel: CancellationToken,
1525 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1526 initial_turn_input_claim: Option<crate::TurnInputClaim>,
1527 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1528 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1529 ) -> Result<AssembledTurn, RuntimeError> {
1530 if session_execution_lease.is_none()
1531 && self
1532 .session
1533 .as_ref()
1534 .and_then(|session| session.history_store())
1535 .is_some()
1536 {
1537 return Err(RuntimeError::new(
1538 RuntimeErrorCode::StoreCommitFailed,
1539 "prepared turn requires a session execution lease",
1540 ));
1541 }
1542 let session_execution_fence =
1543 session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1544 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1545 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1546 let mut turn_policy = self.state.effective_policy().clone();
1547 let turn_provider_override = turn_context.provider().cloned();
1548 if let Some(provider) = turn_provider_override.as_ref() {
1549 turn_policy.provider_id = provider.kind().to_string();
1550 }
1551 if let Some(model) = turn_context.model_spec() {
1552 turn_policy.model = model.clone();
1553 }
1554 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1555 let effective_protocol_turn_options = protocol_turn_options
1556 .clone()
1557 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1558 .unwrap_or(session_protocol_turn_options);
1559 let manager = self
1560 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1561 .map_err(|err| {
1562 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1563 })?;
1564 let plugins = {
1565 let session = self
1566 .session
1567 .as_ref()
1568 .expect("lash runtime session must be available");
1569 Arc::clone(session.plugins())
1570 };
1571 let mut assembler = TurnAssembler::new();
1572 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1573 let prepared = {
1579 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1580 PrepareTurnRequest {
1581 session_id: self.state.session_id.clone(),
1582 state: crate::SessionReadView::from_runtime_state(
1583 &self.state,
1584 turn_policy.clone(),
1585 effective_protocol_turn_options.clone(),
1586 ),
1587 messages,
1588 sessions: manager.state_service(),
1589 session_lifecycle: manager.lifecycle_service(),
1590 session_graph: manager.graph_service(),
1591 turn_context: turn_context.clone(),
1592 },
1593 self.turn_phase_probe.clone(),
1594 );
1595 let mut prepare_turn = Box::pin(prepare_turn);
1596
1597 loop {
1598 tokio::select! {
1599 prepared = prepare_turn.as_mut() => {
1600 let prepared = prepared.map_err(|err| {
1601 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1602 })?;
1603 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1604 break prepared;
1605 }
1606 maybe_event = event_rx.recv() => {
1607 if let Some(event) = maybe_event {
1608 emit_runtime_stream_event_to_sinks(
1609 events,
1610 turn_events,
1611 event,
1612 &mut assembler,
1613 )
1614 .await;
1615 }
1616 }
1617 }
1618 }
1619 };
1620 for event in &prepared.events {
1621 assembler.push(event);
1622 }
1623 emit_session_events_to_sink(events, prepared.events).await;
1624 if let Some(abort) = prepared.abort {
1625 drop(event_tx);
1626
1627 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1628 self.state.clone(),
1629 Arc::clone(&self.host.core.clock),
1630 );
1631 turn_pipeline.apply_prepared_messages(&prepared.messages);
1632 let state = turn_pipeline.into_final_state();
1633 let issue = TurnIssue {
1634 kind: "plugin".to_string(),
1635 code: Some(abort.code),
1636 terminal_reason: None,
1637 message: abort.message.clone(),
1638 raw: None,
1639 retryable: None,
1640 provider_failure_kind: None,
1641 };
1642 let error_event = SessionEvent::Error {
1643 message: abort.message,
1644 envelope: Some(crate::session_model::ErrorEnvelope {
1645 kind: "plugin".to_string(),
1646 code: issue.code.clone(),
1647 terminal_reason: None,
1648 user_message: issue.message.clone(),
1649 raw: None,
1650 retryable: None,
1651 provider_failure_kind: None,
1652 }),
1653 };
1654 assembler.push(&error_event);
1655 emit_turn_activity_to_sink(
1656 turn_events,
1657 TurnActivity::independent(TurnEvent::Error {
1658 message: issue.message.clone(),
1659 }),
1660 )
1661 .await;
1662 emit_session_event_to_sink(events, error_event).await;
1663 let outcome_event = SessionEvent::TurnOutcome {
1664 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1665 };
1666 assembler.push(&outcome_event);
1667 emit_session_event_to_sink(events, outcome_event).await;
1668 assembler.push(&SessionEvent::Done);
1669 emit_session_event_to_sink(events, SessionEvent::Done).await;
1670 return Ok(assembler.finish(
1671 state.to_snapshot(),
1672 cancel.is_cancelled(),
1673 Some(issue),
1674 &self.host.core.control.termination,
1675 ));
1676 }
1677 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1678 self.state.clone(),
1679 Arc::clone(&self.host.core.clock),
1680 )
1681 .with_session_execution_lease(session_execution_fence.clone());
1682 let store = self
1683 .session
1684 .as_ref()
1685 .and_then(|session| session.history_store());
1686 let progress_store = if scoped_effect_controller.controller().durability_tier()
1690 == crate::DurabilityTier::Durable
1691 {
1692 None
1693 } else {
1694 store.as_ref().map(|store| store.as_ref())
1695 };
1696 turn_pipeline
1697 .prepared_checkpoint(
1698 progress_store,
1699 turn_policy.clone(),
1700 turn_index,
1701 &prepared.messages,
1702 self.session.as_mut(),
1703 )
1704 .await
1705 .map_err(super::runtime_error_from_store_commit)?;
1706 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1707 RuntimeSessionPolicy::from_provider(
1708 turn_policy.clone(),
1709 provider.with_clock(Arc::clone(&self.host.core.clock)),
1710 )
1711 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1712 } else {
1713 self.host
1714 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1715 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1716 };
1717 let manager = self
1718 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1719 .map_err(|err| {
1720 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1721 })?;
1722 let cancel_state = cancel.clone();
1723 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1724 let session = self
1725 .session
1726 .take()
1727 .expect("lash runtime session must be available");
1728 let mut driver = RuntimeTurnDriver {
1729 session,
1730 policy: resolved_turn_policy,
1731 host: self.host.clone(),
1732 turn_id: scoped_effect_controller.scope_id().to_string(),
1733 scoped_effect_controller,
1734 session_id: self.state.session_id.clone(),
1735 turn_index,
1736 turn_pipeline,
1737 llm_stream_summaries: HashMap::new(),
1738 next_llm_ordinal: 0,
1739 session_services: manager,
1740 protocol_turn_options: effective_protocol_turn_options,
1741 protocol_extension,
1742 turn_context,
1743 turn_causes: initial_turn_causes,
1744 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1745 pending_turn_input_claims: initial_turn_input_claim.into_iter().collect(),
1746 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1747 session_execution_lease: session_execution_fence,
1748 runtime_lease_owner: self.runtime_lease_owner.clone(),
1749 turn_phase_probe: self.turn_phase_probe.clone(),
1750 };
1751 let protocol_run_offset = 0;
1752 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1753 let run_result = drive_turn_to_completion(
1754 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1755 &mut event_rx,
1756 &mut assembler,
1757 &child_usage_event_relay,
1758 events,
1759 turn_events,
1760 )
1761 .await;
1762 let (new_messages, _new_protocol_iteration) = match run_result {
1763 Ok(result) => result,
1764 Err(err) => {
1765 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1766 let RuntimeTurnDriver {
1767 session,
1768 pending_queue_claims,
1769 pending_turn_input_claims,
1770 ..
1771 } = driver;
1772 self.session = Some(session);
1773 self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1774 .await;
1775 self.abandon_turn_input_claims_after_lease_loss(&err, &pending_turn_input_claims)
1776 .await;
1777 return Err(err);
1778 }
1779 };
1780 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1781 tracing::debug!(
1782 new_message_count = new_messages.len(),
1783 tool_call_count = assembler.tool_calls.len(),
1784 "runtime post-run_task"
1785 );
1786
1787 let RuntimeTurnDriver {
1788 session,
1789 policy,
1790 turn_pipeline,
1791 pending_queue_claims,
1792 pending_turn_input_claims,
1793 ..
1794 } = driver;
1795 self.session = Some(session);
1796 let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1797 let pending_turn_input_claims_for_abandon = pending_turn_input_claims.clone();
1798 let finish_result = self
1799 .finish_turn(
1800 TurnFinishInput {
1801 turn_pipeline,
1802 assembler,
1803 new_messages,
1804 policy,
1805 turn_index,
1806 queued_work_completions: pending_queue_claims
1807 .iter()
1808 .map(crate::QueuedWorkClaim::completion)
1809 .collect(),
1810 turn_input_completions: pending_turn_input_claims
1811 .iter()
1812 .map(crate::TurnInputClaim::completion)
1813 .collect(),
1814 trace_turn_id,
1815 },
1816 events,
1817 &finish_scoped_effect_controller,
1818 &cancel_state,
1819 session_execution_lease,
1820 session_execution_lease_release_policy,
1821 )
1822 .await;
1823 if let Err(err) = &finish_result {
1824 self.abandon_queued_work_claims_after_lease_loss(
1825 err,
1826 &pending_queue_claims_for_abandon,
1827 )
1828 .await;
1829 self.abandon_turn_input_claims_after_lease_loss(
1830 err,
1831 &pending_turn_input_claims_for_abandon,
1832 )
1833 .await;
1834 }
1835 finish_result
1836 }
1837 async fn normalize_input_items(
1838 &self,
1839 items: &[InputItem],
1840 image_blobs: &HashMap<String, Vec<u8>>,
1841 ) -> Result<Vec<NormalizedItem>, String> {
1842 normalize_input_items(
1843 items,
1844 image_blobs,
1845 self.host.core.durability.attachment_store.as_ref(),
1846 )
1847 .await
1848 }
1849}
1850
1851fn turn_input_from_text(text: String) -> TurnInput {
1852 TurnInput {
1853 items: vec![InputItem::Text { text }],
1854 image_blobs: HashMap::new(),
1855 protocol_turn_options: None,
1856 trace_turn_id: None,
1857 protocol_extension: None,
1858 turn_context: crate::TurnContext::default(),
1859 }
1860}
1861
1862fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1863 if completed_turn_count == 0 {
1864 root_turn_id.to_string()
1865 } else {
1866 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1867 }
1868}
1869
1870pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1871 if input.protocol_extension.is_some() {
1872 return Err(RuntimeError::new(
1873 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1874 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1875 ));
1876 }
1877 input
1878 .turn_context
1879 .live_plugin_inputs()
1880 .durable_effect_rejection()?;
1881 Ok(())
1882}
1883
1884async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1885 if !events.is_noop() {
1886 events.emit(activity).await;
1887 }
1888}
1889
1890async fn drive_turn_to_completion<F>(
1900 run_future: F,
1901 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1902 assembler: &mut TurnAssembler,
1903 child_usage_event_relay: &ChildUsageEventRelay,
1904 events: &dyn EventSink,
1905 turn_events: &dyn TurnActivitySink,
1906) -> Result<(crate::MessageSequence, usize), RuntimeError>
1907where
1908 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1909{
1910 let run_result = {
1911 let mut run_future = Box::pin(run_future);
1912 loop {
1913 tokio::select! {
1914 maybe_event = event_rx.recv() => {
1915 if let Some(event) = maybe_event {
1916 emit_runtime_stream_event_to_sinks(
1917 events,
1918 turn_events,
1919 event,
1920 assembler,
1921 )
1922 .await;
1923 }
1924 }
1925 completed = run_future.as_mut() => {
1926 child_usage_event_relay.clear();
1927 break completed;
1928 }
1929 }
1930 }
1931 };
1932 while let Some(event) = event_rx.recv().await {
1933 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1934 }
1935 run_result
1936}
1937
1938async fn emit_runtime_stream_event_to_sinks(
1939 events: &dyn EventSink,
1940 turn_events: &dyn TurnActivitySink,
1941 event: RuntimeStreamEvent,
1942 assembler: &mut TurnAssembler,
1943) {
1944 match event {
1945 RuntimeStreamEvent::Session(event) => {
1946 assembler.push(&event);
1947 emit_session_event_to_sink(events, event).await;
1948 }
1949 RuntimeStreamEvent::Turn(activity) => {
1950 emit_turn_activity_to_sink(turn_events, activity).await;
1951 }
1952 }
1953}
1954
1955#[cfg(test)]
1956mod tests {
1957 use super::agent_frame_follow_turn_id;
1958
1959 #[test]
1960 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1961 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1962 assert_eq!(
1963 agent_frame_follow_turn_id("root-turn", 1),
1964 "root-turn:agent-frame:1"
1965 );
1966 assert_eq!(
1967 agent_frame_follow_turn_id("root-turn", 2),
1968 "root-turn:agent-frame:2"
1969 );
1970 }
1971}