1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::FinalValue { .. }) => ("completed", "final_value", None),
15 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
16 TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
17 "completed",
18 "agent_frame_switch",
19 Some(lash_trace::TraceAgentFrameSwitch {
20 frame_id: frame_id.clone(),
21 }),
22 ),
23 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
24 }
25}
26
27fn trace_stop_reason(stop: &TurnStop) -> &'static str {
28 match stop {
29 TurnStop::Cancelled => "cancelled",
30 TurnStop::Incomplete => "incomplete",
31 TurnStop::InvalidInput => "invalid_input",
32 TurnStop::MaxTurns => "max_turns",
33 TurnStop::ToolFailure => "tool_failure",
34 TurnStop::ProviderError => "provider_error",
35 TurnStop::PluginAbort => "plugin_abort",
36 TurnStop::RuntimeError => "runtime_error",
37 TurnStop::SubmittedError { .. } => "submitted_error",
38 TurnStop::ToolError { .. } => "tool_error",
39 }
40}
41
42fn session_head_refresh_error(err: SessionError) -> RuntimeError {
43 RuntimeError::new(
44 RuntimeErrorCode::Other("session_head_refresh".to_string()),
45 err.to_string(),
46 )
47}
48
49#[derive(Clone, Copy)]
50enum SessionExecutionLeaseReleasePolicy {
51 FinalCommit,
52 KeepOnAgentFrameSwitch,
53}
54
55impl SessionExecutionLeaseReleasePolicy {
56 fn should_release(self, outcome: &TurnOutcome) -> bool {
57 match self {
58 Self::FinalCommit => true,
59 Self::KeepOnAgentFrameSwitch => {
60 !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
61 }
62 }
63 }
64}
65
66fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
67 match payload {
68 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
69 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
70 }
71}
72
73fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
74 claim
75 .batches
76 .iter()
77 .map(|batch| batch.batch_id.clone())
78 .collect()
79}
80
81fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
82 format!("{parent_turn_id}:{phase}")
83}
84
85fn scoped_child_turn_controller<'run>(
86 scoped_effect_controller: &'run ScopedEffectController<'_>,
87 session_id: &str,
88 turn_id: &str,
89) -> Result<ScopedEffectController<'run>, RuntimeError> {
90 ScopedEffectController::borrowed(
91 scoped_effect_controller.controller(),
92 ExecutionScope::turn(session_id, turn_id),
93 )
94}
95
96pub(in crate::runtime) fn queued_work_trace_payload(
97 boundary: crate::QueuedWorkClaimBoundary,
98 claim: &crate::QueuedWorkClaim,
99 causes: &[crate::TurnCause],
100) -> serde_json::Value {
101 serde_json::json!({
102 "boundary": boundary,
103 "claim_id": claim.claim_id,
104 "owner_id": claim.owner.owner_id,
105 "incarnation_id": claim.owner.incarnation_id,
106 "batch_ids": queued_work_batch_ids(claim),
107 "payload_types": claim.batches.iter()
108 .flat_map(|batch| batch.items.iter())
109 .map(|item| queued_work_payload_type(&item.payload))
110 .collect::<Vec<_>>(),
111 "causes": causes,
112 })
113}
114
115pub(in crate::runtime) fn queued_work_completion_trace_payload(
116 completions: &[crate::QueuedWorkCompletion],
117) -> serde_json::Value {
118 serde_json::json!({
119 "claims": completions.iter().map(|completion| {
120 serde_json::json!({
121 "session_id": completion.session_id,
122 "claim_id": completion.claim_id,
123 "batch_ids": completion.batch_ids,
124 })
125 }).collect::<Vec<_>>(),
126 })
127}
128
129async fn emit_queued_work_started_to_sink(
130 events: &dyn TurnActivitySink,
131 boundary: crate::QueuedWorkClaimBoundary,
132 claim: &crate::QueuedWorkClaim,
133 causes: Vec<crate::TurnCause>,
134) {
135 emit_turn_activity_to_sink(
136 events,
137 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
138 boundary,
139 batch_ids: queued_work_batch_ids(claim),
140 causes,
141 }),
142 )
143 .await;
144}
145
146pub(in crate::runtime) async fn send_queued_work_started_event(
147 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
148 boundary: crate::QueuedWorkClaimBoundary,
149 claim: &crate::QueuedWorkClaim,
150 causes: Vec<crate::TurnCause>,
151) {
152 send_turn_activity(
153 event_tx,
154 TurnActivityId::fresh(),
155 TurnEvent::QueuedWorkStarted {
156 boundary,
157 batch_ids: queued_work_batch_ids(claim),
158 causes,
159 },
160 )
161 .await;
162}
163
164struct TurnFinishInput {
165 turn_pipeline: TurnBoundary,
166 assembler: TurnAssembler,
167 new_messages: crate::MessageSequence,
168 policy: RuntimeSessionPolicy,
169 turn_index: usize,
170 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
171 turn_input_completions: Vec<crate::TurnInputCompletion>,
172 trace_turn_id: String,
173}
174
175impl LashRuntime {
176 fn max_context_tokens(&self) -> usize {
177 self.state.effective_policy().context_window_tokens()
178 }
179
180 async fn claim_session_execution_lease(
181 &self,
182 cancel: CancellationToken,
183 busy_is_error: bool,
184 ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
185 let Some(store) = self
186 .session
187 .as_ref()
188 .and_then(|session| session.history_store())
189 else {
190 return Ok(None);
191 };
192 match SessionExecutionLeaseGuard::try_acquire(
193 store,
194 &self.state.session_id,
195 &self.runtime_lease_owner,
196 Arc::clone(&self.host.core.clock),
197 cancel,
198 )
199 .await
200 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
201 {
202 Some(lease) => Ok(Some(lease)),
203 None if busy_is_error => Err(RuntimeError::new(
204 RuntimeErrorCode::SessionExecutionBusy,
205 format!(
206 "session `{}` is already executing on another runtime owner",
207 self.state.session_id
208 ),
209 )),
210 None => Ok(None),
211 }
212 }
213
214 async fn settle_session_execution_lease<T>(
215 &self,
216 guard: Option<&SessionExecutionLeaseGuard>,
217 result: Result<T, RuntimeError>,
218 ) -> Result<T, RuntimeError> {
219 match result {
220 Ok(value) => {
221 if let Some(guard) = guard {
222 guard.release_if_live().await.map_err(|err| {
223 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
224 })?;
225 }
226 Ok(value)
227 }
228 Err(err) => {
229 if err.code != RuntimeErrorCode::StoreCommitFailed
230 && let Some(guard) = guard
231 && let Err(release_err) = guard.release_if_live().await
232 {
233 tracing::warn!(
234 error = %release_err,
235 "failed to release session execution lease after runtime error"
236 );
237 }
238 Err(err)
239 }
240 }
241 }
242
243 async fn ensure_session_execution_lease_live(
244 &self,
245 guard: Option<&SessionExecutionLeaseGuard>,
246 ) -> Result<(), RuntimeError> {
247 let Some(guard) = guard else {
248 return Ok(());
249 };
250 guard.refresh_or_mark_lost().await.map_err(|err| {
251 RuntimeError::new(
252 RuntimeErrorCode::SessionExecutionLeaseLost,
253 format!(
254 "session execution lease for session `{}` was lost before commit: {err}",
255 self.state.session_id
256 ),
257 )
258 })
259 }
260
261 async fn abandon_queued_work_claims_after_lease_loss(
262 &self,
263 err: &RuntimeError,
264 claims: &[crate::QueuedWorkClaim],
265 ) {
266 if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
267 return;
268 }
269 let Some(store) = self
270 .session
271 .as_ref()
272 .and_then(|session| session.history_store())
273 else {
274 return;
275 };
276 for claim in claims {
277 if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
278 tracing::warn!(
279 error = %abandon_err,
280 session_id = %claim.session_id,
281 claim_id = %claim.claim_id,
282 "failed to abandon queued work claim after session execution lease loss"
283 );
284 }
285 }
286 }
287
288 async fn abandon_turn_input_claims_after_lease_loss(
289 &self,
290 err: &RuntimeError,
291 claims: &[crate::TurnInputClaim],
292 ) {
293 if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
294 return;
295 }
296 let Some(store) = self
297 .session
298 .as_ref()
299 .and_then(|session| session.history_store())
300 else {
301 return;
302 };
303 for claim in claims {
304 if let Err(abandon_err) = store.abandon_turn_input_claim(claim).await {
305 tracing::warn!(
306 error = %abandon_err,
307 session_id = %claim.session_id,
308 claim_id = %claim.claim_id,
309 "failed to abandon turn input claim after session execution lease loss"
310 );
311 }
312 }
313 }
314
315 #[doc(hidden)]
316 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
317 self.turn_phase_probe = Some(probe);
318 }
319
320 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
321 if let Some(probe) = self.turn_phase_probe.as_ref() {
322 probe.begin(phase);
323 }
324 }
325
326 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
327 if let Some(probe) = self.turn_phase_probe.as_ref() {
328 probe.end(phase);
329 }
330 }
331
332 async fn finish_turn(
333 &mut self,
334 finish: TurnFinishInput,
335 events: &dyn EventSink,
336 scoped_effect_controller: &ScopedEffectController<'_>,
337 cancel_state: &CancellationToken,
338 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
339 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
340 ) -> Result<AssembledTurn, RuntimeError> {
341 let TurnFinishInput {
342 mut turn_pipeline,
343 assembler,
344 new_messages,
345 policy,
346 turn_index,
347 queued_work_completions,
348 turn_input_completions,
349 trace_turn_id,
350 } = finish;
351 self.policy = self.state.effective_policy().clone();
352 turn_pipeline.state_mut().policy = self.policy.clone();
353 turn_pipeline.state_mut().turn_index = turn_index;
354
355 let mut turn_usage_delta = {
356 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
357 std::mem::take(&mut *ledger)
358 };
359 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
360 turn_usage_delta.push(TokenLedgerEntry {
361 source: "turn".to_string(),
362 model: policy.model.id.clone(),
363 usage: assembler.token_usage.clone(),
364 });
365 }
366 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
367
368 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
369 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
370 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
371 }
372
373 let last_prompt_usage = assembler
374 .last_llm_usage()
375 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
376 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
377 let assembled_state = turn_pipeline.export_state_for_assembly();
378 let assembled = assembler.finish(
379 assembled_state,
380 cancel_state.is_cancelled(),
381 None,
382 &self.host.core.control.termination,
383 );
384
385 let Some(session) = self.session.as_ref() else {
386 self.state.apply_snapshot(&assembled.state);
387 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
388 return Ok(assembled);
389 };
390 self.ensure_session_execution_lease_live(session_execution_lease)
391 .await?;
392
393 let plugins = Arc::clone(session.plugins());
394 let manager = match self.runtime_session_services_for_turn(None) {
395 Ok(manager) => manager,
396 Err(err) => {
397 return Err(RuntimeError::new(
398 RuntimeErrorCode::PluginSessionManager,
399 err.to_string(),
400 ));
401 }
402 };
403
404 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
405 let finalized = match plugins
406 .finalize_turn_with_phase_probe(
407 assembled,
408 manager.state_service(),
409 manager.lifecycle_service(),
410 manager.graph_service(),
411 self.turn_phase_probe.clone(),
412 )
413 .await
414 {
415 Ok(finalized) => finalized,
416 Err(err) => {
417 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
418 return Err(RuntimeError::new(
419 RuntimeErrorCode::PluginFinalizeTurn,
420 err.to_string(),
421 ));
422 }
423 };
424 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
425 self.ensure_session_execution_lease_live(session_execution_lease)
426 .await?;
427
428 let mut returned_turn = finalized.turn;
429 let release_session_execution_lease =
430 session_execution_lease_release_policy.should_release(&returned_turn.outcome);
431 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
432 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
433 let queued_work_completion_trace = queued_work_completions.clone();
434 let pending_attachment_ids = self
435 .host
436 .core
437 .durability
438 .attachment_store
439 .pending_manifest_commit_ids();
440 if let Err(err) = turn_pipeline
441 .final_commit(
442 &mut returned_turn,
443 self.session.as_mut(),
444 &turn_usage_delta,
445 Some(&trace_turn_id),
446 queued_work_completions,
447 turn_input_completions,
448 cancel_state.is_cancelled().then(|| trace_turn_id.clone()),
449 pending_attachment_ids.clone(),
450 release_session_execution_lease
451 .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
452 .flatten(),
453 )
454 .await
455 {
456 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
457 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
458 return Err(err);
459 }
460 if release_session_execution_lease && let Some(lease) = session_execution_lease {
461 lease.mark_released();
462 }
463 self.host
464 .core
465 .durability
466 .attachment_store
467 .mark_manifest_committed(&pending_attachment_ids);
468 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
469
470 emit_session_events_to_sink(events, finalized.events).await;
471 self.state = turn_pipeline.into_final_state();
472 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
473 && let Some(session) = self.session.as_mut()
474 {
475 let protocol_session = Arc::clone(session.plugins().protocol_session());
476 let session_id = self.state.session_id.clone();
477 protocol_session
478 .restore_session(
479 crate::plugin::ProtocolSessionContext::new(session, &session_id),
480 &self.state,
481 )
482 .await
483 .map_err(|err| {
484 RuntimeError::new(
485 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
486 err.to_string(),
487 )
488 })?;
489 }
490 if !queued_work_completion_trace.is_empty() {
491 crate::trace::emit_trace(
492 &self.host.core.tracing.trace_sink,
493 &self.host.core.tracing.trace_context,
494 lash_trace::TraceContext::default()
495 .for_session(returned_turn.state.session_id.clone())
496 .for_turn_index(returned_turn.state.turn_index)
497 .for_turn(trace_turn_id.clone()),
498 lash_trace::TraceEvent::Custom {
499 name: "queued_work.completed".to_string(),
500 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
501 },
502 self.host.core.clock.as_ref(),
503 );
504 }
505 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
506 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
507 .await?;
508 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
509 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
510
511 self.emit_completed_turn_trace(
512 &returned_turn.state,
513 &returned_turn.outcome,
514 &trace_turn_id,
515 );
516 Ok(returned_turn)
517 }
518
519 fn emit_completed_turn_trace(
520 &self,
521 state: &SessionSnapshot,
522 outcome: &TurnOutcome,
523 trace_turn_id: &str,
524 ) {
525 if self.host.core.tracing.trace_sink.is_none() {
526 return;
527 }
528
529 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
530 crate::trace::emit_trace(
531 &self.host.core.tracing.trace_sink,
532 &self.host.core.tracing.trace_context,
533 lash_trace::TraceContext::default()
534 .for_session(state.session_id.clone())
535 .for_turn_index(state.turn_index)
536 .for_turn(trace_turn_id.to_string()),
537 lash_trace::TraceEvent::TurnCompleted {
538 status: status.to_string(),
539 done_reason: done_reason.to_string(),
540 agent_frame_switch,
541 },
542 self.host.core.clock.as_ref(),
543 );
544 }
545
546 async fn emit_turn_persisted_event(
547 &self,
548 returned_turn: &AssembledTurn,
549 scoped_effect_controller: &ScopedEffectController<'_>,
550 trace_turn_id: &str,
551 ) -> Result<(), RuntimeError> {
552 let Some(session) = self.session.as_ref() else {
553 return Ok(());
554 };
555 let Ok(manager) = self.runtime_session_services() else {
556 return Ok(());
557 };
558 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
559 let phase_controller = scoped_child_turn_controller(
560 scoped_effect_controller,
561 &self.state.session_id,
562 &phase_turn_id,
563 )?;
564 let direct_completions = manager.direct_completion_client(
565 RuntimeEffectControllerHandle::borrowed(phase_controller),
566 Some(phase_turn_id),
567 );
568
569 session
570 .plugins()
571 .emit_runtime_event_with_phase_probe(
572 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
573 crate::SessionStateChangedContext {
574 session_id: self.state.session_id.clone(),
575 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
576 sessions: manager.state_service(),
577 session_graph: manager.graph_service(),
578 direct_completions,
579 },
580 )),
581 self.turn_phase_probe.clone(),
582 )
583 .await;
584 Ok(())
585 }
586
587 pub async fn stream_turn(
589 &mut self,
590 input: TurnInput,
591 opts: TurnOptions<'_>,
592 ) -> Result<AssembledTurn, RuntimeError> {
593 let cancel = opts.cancel.clone();
594 let session_execution_lease = self
595 .claim_session_execution_lease(cancel.clone(), true)
596 .await?;
597 let scoped_effect_controller = opts.scoped_effect_controller();
598 let result = self
599 .stream_turn_with_scoped_effect_controller_inner(
600 input,
601 opts.events_or_noop(),
602 opts.turn_events_or_noop(),
603 scoped_effect_controller,
604 cancel,
605 None,
606 None,
607 session_execution_lease.as_ref(),
608 SessionExecutionLeaseReleasePolicy::FinalCommit,
609 )
610 .await;
611 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
612 .await
613 }
614
615 pub async fn stream_next_queued_work(
616 &mut self,
617 opts: TurnOptions<'_>,
618 ) -> Result<Option<AssembledTurn>, RuntimeError> {
619 self.stream_queued_work(opts, None).await
620 }
621
622 pub async fn stream_selected_queued_work(
623 &mut self,
624 opts: TurnOptions<'_>,
625 batch_ids: &[String],
626 ) -> Result<Option<AssembledTurn>, RuntimeError> {
627 self.stream_queued_work(opts, Some(batch_ids)).await
628 }
629
630 async fn stream_queued_work(
631 &mut self,
632 opts: TurnOptions<'_>,
633 selected_batch_ids: Option<&[String]>,
634 ) -> Result<Option<AssembledTurn>, RuntimeError> {
635 let cancel = opts.cancel.clone();
636 let Some(session_execution_lease) = self
637 .claim_session_execution_lease(cancel.clone(), false)
638 .await?
639 else {
640 return Ok(None);
641 };
642 let session_execution_fence = session_execution_lease.fence();
643 let Some(store) = self
644 .session
645 .as_ref()
646 .and_then(|session| session.history_store())
647 else {
648 session_execution_lease
649 .release_if_live()
650 .await
651 .map_err(|err| {
652 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
653 })?;
654 return Ok(None);
655 };
656 let drain_commands_before_turn_input = if selected_batch_ids.is_some() {
657 true
658 } else {
659 self.session_commands_precede_pending_turn_input(store.as_ref())
660 .await?
661 };
662 if drain_commands_before_turn_input {
663 loop {
664 match self
665 .drain_next_session_command(&session_execution_fence)
666 .await
667 {
668 Ok(Some(_)) => {}
669 Ok(None) => break,
670 Err(err) => {
671 let _ = session_execution_lease.release_if_live().await;
672 return Err(err);
673 }
674 }
675 }
676 }
677 if selected_batch_ids.is_none() {
678 let input_claim = store
679 .claim_next_turn_inputs(
680 &self.state.session_id,
681 &session_execution_fence,
682 &self.runtime_lease_owner,
683 crate::TURN_INPUT_CLAIM_TTL_MS,
684 64,
685 )
686 .await
687 .map_err(super::runtime_error_from_store_commit)?;
688 if let Some(input_claim) = input_claim {
689 let mut input = input_claim.materialize_for_turn();
690 let turn_id = input
691 .trace_turn_id
692 .clone()
693 .or_else(|| Some(opts.execution_scope_id().to_owned()))
694 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
695 input.trace_turn_id = Some(turn_id.clone());
696 crate::trace::emit_trace(
697 &self.host.core.tracing.trace_sink,
698 &self.host.core.tracing.trace_context,
699 lash_trace::TraceContext::default()
700 .for_session(self.state.session_id.clone())
701 .for_turn_index(self.state.turn_index + 1)
702 .for_turn(turn_id.clone()),
703 lash_trace::TraceEvent::Custom {
704 name: "turn_input.claimed".to_string(),
705 payload: serde_json::json!({
706 "claim_id": &input_claim.claim_id,
707 "input_ids": input_claim.inputs.iter().map(|input| input.input_id.clone()).collect::<Vec<_>>(),
708 }),
709 },
710 self.host.core.clock.as_ref(),
711 );
712 let claim_for_abandon = input_claim.clone();
713 let scoped_effect_controller = opts.scoped_effect_controller();
714 let result = self
715 .stream_turn_with_scoped_effect_controller_inner(
716 input,
717 opts.events_or_noop(),
718 opts.turn_events_or_noop(),
719 scoped_effect_controller,
720 cancel,
721 None,
722 Some(input_claim),
723 Some(&session_execution_lease),
724 SessionExecutionLeaseReleasePolicy::FinalCommit,
725 )
726 .await
727 .map(Some);
728 if let Err(err) = &result {
729 self.abandon_turn_input_claims_after_lease_loss(
730 err,
731 std::slice::from_ref(&claim_for_abandon),
732 )
733 .await;
734 }
735 return self
736 .settle_session_execution_lease(Some(&session_execution_lease), result)
737 .await;
738 }
739 }
740 let claim = if let Some(batch_ids) = selected_batch_ids {
741 store
742 .claim_ready_queued_work_by_batch_ids(
743 &self.state.session_id,
744 &session_execution_fence,
745 &self.runtime_lease_owner,
746 crate::QueuedWorkClaimBoundary::Idle,
747 crate::QUEUED_WORK_CLAIM_TTL_MS,
748 batch_ids,
749 )
750 .await
751 } else {
752 store
753 .claim_ready_queued_work(
754 &self.state.session_id,
755 &session_execution_fence,
756 &self.runtime_lease_owner,
757 crate::QueuedWorkClaimBoundary::Idle,
758 crate::QUEUED_WORK_CLAIM_TTL_MS,
759 64,
760 )
761 .await
762 }
763 .map_err(super::runtime_error_from_store_commit)?;
764 let Some(claim) = claim else {
765 session_execution_lease
766 .release_if_live()
767 .await
768 .map_err(|err| {
769 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
770 })?;
771 return Ok(None);
772 };
773 let mut work = claim.materialize_for_turn();
774 let turn_id = work
775 .input
776 .trace_turn_id
777 .clone()
778 .or_else(|| Some(opts.execution_scope_id().to_owned()))
779 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
780 work.input.trace_turn_id = Some(turn_id.clone());
781 let causes = work.turn_causes.clone();
782 emit_queued_work_started_to_sink(
783 opts.turn_events_or_noop(),
784 crate::QueuedWorkClaimBoundary::Idle,
785 &claim,
786 causes.clone(),
787 )
788 .await;
789 crate::trace::emit_trace(
790 &self.host.core.tracing.trace_sink,
791 &self.host.core.tracing.trace_context,
792 lash_trace::TraceContext::default()
793 .for_session(self.state.session_id.clone())
794 .for_turn_index(self.state.turn_index + 1)
795 .for_turn(turn_id.clone()),
796 lash_trace::TraceEvent::Custom {
797 name: "queued_work.claimed".to_string(),
798 payload: queued_work_trace_payload(
799 crate::QueuedWorkClaimBoundary::Idle,
800 &claim,
801 &causes,
802 ),
803 },
804 self.host.core.clock.as_ref(),
805 );
806 let claim_for_abandon = claim.clone();
807 let scoped_effect_controller = opts.scoped_effect_controller();
808 let result = self
809 .stream_turn_with_scoped_effect_controller_inner(
810 work.input,
811 opts.events_or_noop(),
812 opts.turn_events_or_noop(),
813 scoped_effect_controller,
814 cancel,
815 Some(claim),
816 None,
817 Some(&session_execution_lease),
818 SessionExecutionLeaseReleasePolicy::FinalCommit,
819 )
820 .await
821 .map(Some);
822 if let Err(err) = &result {
823 self.abandon_queued_work_claims_after_lease_loss(
824 err,
825 std::slice::from_ref(&claim_for_abandon),
826 )
827 .await;
828 }
829 self.settle_session_execution_lease(Some(&session_execution_lease), result)
830 .await
831 }
832
833 async fn session_commands_precede_pending_turn_input(
834 &self,
835 store: &dyn crate::RuntimePersistence,
836 ) -> Result<bool, RuntimeError> {
837 let pending_inputs = store
838 .list_pending_turn_inputs(&self.state.session_id)
839 .await
840 .map_err(super::runtime_error_from_store_commit)?;
841 let earliest_input = pending_inputs
842 .iter()
843 .filter(|input| input.state.is_next_turn_pending())
844 .min_by_key(|input| (input.enqueued_at_ms, input.enqueue_seq));
845 let queued_work = store
846 .list_pending_queued_work(&self.state.session_id)
847 .await
848 .map_err(super::runtime_error_from_store_commit)?;
849 let earliest_command = queued_work
850 .iter()
851 .filter(|batch| batch.is_session_command_work())
852 .min_by_key(|batch| (batch.enqueued_at_ms, batch.enqueue_seq));
853 Ok(match (earliest_command, earliest_input) {
854 (Some(command), Some(input)) => command.enqueued_at_ms < input.enqueued_at_ms,
855 (Some(_), None) => true,
856 _ => false,
857 })
858 }
859
860 fn ensure_durable_store_facets_for_scope(
868 &self,
869 scoped_effect_controller: &ScopedEffectController<'_>,
870 ) -> Result<(), RuntimeError> {
871 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
872 {
873 return Ok(());
874 }
875 if self
876 .host
877 .core
878 .durability
879 .attachment_store
880 .persistence()
881 .durability_tier()
882 != crate::DurabilityTier::Durable
883 {
884 return Err(RuntimeError::durable_store_required(
885 crate::DurableStoreFacet::AttachmentStore,
886 ));
887 }
888 if self
889 .host
890 .core
891 .durability
892 .process_env_store
893 .durability_tier()
894 != crate::DurabilityTier::Durable
895 {
896 return Err(RuntimeError::durable_store_required(
897 crate::DurableStoreFacet::ProcessEnvStore,
898 ));
899 }
900 if let Some(store) = self
901 .session
902 .as_ref()
903 .and_then(|session| session.history_store())
904 && store.durability_tier() != crate::DurabilityTier::Durable
905 {
906 return Err(RuntimeError::durable_store_required(
907 crate::DurableStoreFacet::SessionStore,
908 ));
909 }
910 if let Some(process_registry) = self.host.process_registry.as_ref()
911 && process_registry.durability_tier() != crate::DurabilityTier::Durable
912 {
913 return Err(RuntimeError::durable_store_required(
914 crate::DurableStoreFacet::ProcessRegistry,
915 ));
916 }
917 Ok(())
918 }
919
920 #[allow(clippy::too_many_arguments)]
921 async fn stream_turn_with_scoped_effect_controller_inner(
922 &mut self,
923 mut input: TurnInput,
924 events: &dyn EventSink,
925 turn_events: &dyn TurnActivitySink,
926 scoped_effect_controller: ScopedEffectController<'_>,
927 cancel: CancellationToken,
928 queued_claim: Option<crate::QueuedWorkClaim>,
929 turn_input_claim: Option<crate::TurnInputClaim>,
930 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
931 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
932 ) -> Result<AssembledTurn, RuntimeError> {
933 if queued_claim.is_none() && turn_input_claim.is_none() {
934 if let Some(lease) = session_execution_lease {
935 while self
936 .drain_next_session_command(&lease.fence())
937 .await?
938 .is_some()
939 {}
940 } else if self
941 .session
942 .as_ref()
943 .and_then(|session| session.history_store())
944 .is_some()
945 {
946 return Err(RuntimeError::new(
947 RuntimeErrorCode::StoreCommitFailed,
948 "session command drain requires a session execution lease",
949 ));
950 }
951 }
952 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
953 && scoped_effect_controller
954 .execution_scope()
955 .validates_turn_trace_id()
956 && input_turn_id != scoped_effect_controller.scope_id()
957 {
958 return Err(RuntimeError::new(
959 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
960 format!(
961 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
962 scoped_effect_controller.scope_id()
963 ),
964 ));
965 }
966 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
967 input
968 .trace_turn_id
969 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
970 self.stream_turn_inner(
971 input.clone(),
972 events,
973 turn_events,
974 scoped_effect_controller,
975 cancel.clone(),
976 queued_claim,
977 turn_input_claim,
978 session_execution_lease,
979 session_execution_lease_release_policy,
980 )
981 .await
982 }
983
984 pub async fn stream_turn_with_agent_frames(
993 &mut self,
994 input: TurnInput,
995 opts: TurnOptions<'_>,
996 ) -> Result<AgentFrameRun, RuntimeError> {
997 let cancel = opts.cancel.clone();
998 let session_execution_lease = self
999 .claim_session_execution_lease(cancel.clone(), true)
1000 .await?;
1001 let scoped_effect_controller = opts.scoped_effect_controller();
1002 let result = self
1003 .stream_turn_with_agent_frames_inner(
1004 input,
1005 opts.events_or_noop(),
1006 opts.turn_events_or_noop(),
1007 scoped_effect_controller,
1008 cancel,
1009 session_execution_lease.as_ref(),
1010 )
1011 .await;
1012 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1013 .await
1014 }
1015
1016 async fn stream_turn_with_agent_frames_inner(
1017 &mut self,
1018 mut input: TurnInput,
1019 events: &dyn EventSink,
1020 turn_events: &dyn TurnActivitySink,
1021 scoped_effect_controller: ScopedEffectController<'_>,
1022 cancel: CancellationToken,
1023 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1024 ) -> Result<AgentFrameRun, RuntimeError> {
1025 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
1026 && scoped_effect_controller
1027 .execution_scope()
1028 .validates_turn_trace_id()
1029 && input_turn_id != scoped_effect_controller.scope_id()
1030 {
1031 return Err(RuntimeError::new(
1032 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
1033 format!(
1034 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
1035 scoped_effect_controller.scope_id()
1036 ),
1037 ));
1038 }
1039 let follow_protocol_turn_options = input.protocol_turn_options.clone();
1040 let follow_turn_context = input.turn_context.clone();
1041 let follow_trace_turn_id = input
1042 .trace_turn_id
1043 .clone()
1044 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
1045 input
1046 .trace_turn_id
1047 .get_or_insert(follow_trace_turn_id.clone());
1048 let mut turns = Vec::new();
1049 loop {
1050 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
1051 input.trace_turn_id = Some(turn_trace_turn_id.clone());
1052 let turn_effect_controller = if turns.is_empty() {
1053 scoped_effect_controller.clone()
1054 } else {
1055 ScopedEffectController::borrowed(
1056 scoped_effect_controller.controller(),
1057 ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
1058 )?
1059 };
1060 let turn = self
1061 .stream_turn_with_scoped_effect_controller_inner(
1062 input,
1063 events,
1064 turn_events,
1065 turn_effect_controller,
1066 cancel.clone(),
1067 None,
1068 None,
1069 session_execution_lease,
1070 SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
1071 )
1072 .await?;
1073 let switched_frame = match &turn.outcome {
1074 TurnOutcome::AgentFrameSwitch { frame_id, task } => {
1075 Some((frame_id.clone(), task.clone()))
1076 }
1077 _ => None,
1078 };
1079 turns.push(turn);
1080
1081 let Some((_frame_id, task)) = switched_frame else {
1082 return Ok(AgentFrameRun { turns });
1083 };
1084 input = turn_input_from_text(task);
1085 input.protocol_turn_options = follow_protocol_turn_options.clone();
1086 input.turn_context = follow_turn_context.clone();
1087 }
1088 }
1089
1090 #[allow(clippy::too_many_arguments)]
1091 async fn stream_turn_inner(
1092 &mut self,
1093 mut input: TurnInput,
1094 events: &dyn EventSink,
1095 turn_events: &dyn TurnActivitySink,
1096 scoped_effect_controller: ScopedEffectController<'_>,
1097 cancel: CancellationToken,
1098 queued_claim: Option<crate::QueuedWorkClaim>,
1099 turn_input_claim: Option<crate::TurnInputClaim>,
1100 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1101 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1102 ) -> Result<AssembledTurn, RuntimeError> {
1103 self.refresh_session_graph_from_store()
1104 .await
1105 .map_err(session_head_refresh_error)?;
1106 let input_trace_turn_id = input.trace_turn_id.clone();
1107 let queued_turn_work = queued_claim
1108 .as_ref()
1109 .map(crate::QueuedWorkClaim::materialize_for_turn);
1110 let pending_turn_input = turn_input_claim
1111 .as_ref()
1112 .map(crate::TurnInputClaim::materialize_for_turn);
1113 if let Some(work) = pending_turn_input.as_ref()
1114 && input.items.is_empty()
1115 && input.image_blobs.is_empty()
1116 {
1117 input = work.clone();
1118 if input.trace_turn_id.is_none() {
1119 input.trace_turn_id = input_trace_turn_id.clone();
1120 }
1121 }
1122 if let Some(work) = queued_turn_work.as_ref()
1123 && input.items.is_empty()
1124 && input.image_blobs.is_empty()
1125 {
1126 input = work.input.clone();
1127 if input.trace_turn_id.is_none() {
1128 input.trace_turn_id = input_trace_turn_id;
1129 }
1130 }
1131 if self
1132 .session
1133 .as_ref()
1134 .and_then(|session| session.history_store())
1135 .is_some()
1136 {
1137 ensure_durable_effect_input(&input)?;
1138 }
1139 if let Some(extension) = &input.protocol_extension
1140 && let Some(session) = self.session.as_ref()
1141 {
1142 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
1143 protocol_session
1144 .validate_turn_extension(extension)
1145 .await
1146 .map_err(|err| {
1147 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
1148 })?;
1149 }
1150 let previous_prompt_usage = self.state.last_prompt_usage.clone();
1151 let normalized = match self
1152 .normalize_input_items(&input.items, &input.image_blobs)
1153 .await
1154 {
1155 Ok(items) => items,
1156 Err(e) => {
1157 self.state.last_prompt_usage = None;
1158 let mut assembler = TurnAssembler::default();
1159 let error_event = SessionEvent::Error {
1160 message: e.clone(),
1161 envelope: Some(crate::session_model::ErrorEnvelope {
1162 kind: "input_validation".to_string(),
1163 code: Some("invalid_turn_input".to_string()),
1164 terminal_reason: None,
1165 user_message: e.clone(),
1166 raw: None,
1167 }),
1168 };
1169 assembler.push(&error_event);
1170 emit_turn_activity_to_sink(
1171 turn_events,
1172 TurnActivity::independent(TurnEvent::Error { message: e }),
1173 )
1174 .await;
1175 emit_session_event_to_sink(events, error_event).await;
1176 let outcome_event = SessionEvent::TurnOutcome {
1177 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1178 };
1179 assembler.push(&outcome_event);
1180 emit_session_event_to_sink(events, outcome_event).await;
1181 assembler.push(&SessionEvent::Done);
1182 emit_session_event_to_sink(events, SessionEvent::Done).await;
1183 return Ok(assembler.finish(
1184 self.state.to_snapshot(),
1185 false,
1186 None,
1187 &self.host.core.control.termination,
1188 ));
1189 }
1190 };
1191 let turn_index = self.state.turn_index + 1;
1192 let trace_turn_id = input
1193 .trace_turn_id
1194 .clone()
1195 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1196 if self.host.core.tracing.trace_sink.is_some() {
1197 let mut trace_metadata = std::collections::BTreeMap::new();
1198 trace_metadata.insert(
1199 "input_item_count".to_string(),
1200 serde_json::json!(normalized.len()),
1201 );
1202 crate::trace::emit_trace(
1203 &self.host.core.tracing.trace_sink,
1204 &self.host.core.tracing.trace_context,
1205 lash_trace::TraceContext::default()
1206 .for_session(self.state.session_id.clone())
1207 .for_turn_index(turn_index)
1208 .for_turn(trace_turn_id.clone()),
1209 lash_trace::TraceEvent::TurnStarted {
1210 metadata: trace_metadata,
1211 },
1212 self.host.core.clock.as_ref(),
1213 );
1214 }
1215
1216 let base_read_model = self.state.read_model();
1217 let base_messages = base_read_model.messages;
1218 let base_render_cache = base_read_model.prompt_render_cache;
1219 let mut turn_delta = Vec::new();
1220 let initial_turn_causes = queued_turn_work
1221 .as_ref()
1222 .map(|work| work.turn_causes.clone())
1223 .unwrap_or_default();
1224 turn_delta.extend(
1225 initial_turn_causes
1226 .iter()
1227 .map(crate::TurnCause::to_event_message),
1228 );
1229
1230 let user_id = fresh_message_id();
1231 let mut user_parts: Vec<Part> = Vec::new();
1232 for item in normalized {
1233 match item {
1234 NormalizedItem::Text(text) => {
1235 if text.is_empty() {
1236 continue;
1237 }
1238 user_parts.push(Part {
1239 id: format!("{}.p{}", user_id, user_parts.len()),
1240 kind: PartKind::Text,
1241 content: text,
1242 attachment: None,
1243 tool_call_id: None,
1244 tool_name: None,
1245 tool_replay: None,
1246 prune_state: PruneState::Intact,
1247 reasoning_meta: None,
1248 response_meta: None,
1249 });
1250 }
1251 NormalizedItem::Image(reference) => {
1252 user_parts.push(Part {
1253 id: format!("{}.p{}", user_id, user_parts.len()),
1254 kind: PartKind::Image,
1255 content: String::new(),
1256 attachment: Some(crate::session_model::message::PartAttachment {
1257 reference,
1258 }),
1259 tool_call_id: None,
1260 tool_name: None,
1261 tool_replay: None,
1262 prune_state: PruneState::Intact,
1263 reasoning_meta: None,
1264 response_meta: None,
1265 });
1266 }
1267 }
1268 }
1269 if user_parts.is_empty() && initial_turn_causes.is_empty() {
1270 user_parts.push(Part {
1271 id: format!("{}.p0", user_id),
1272 kind: PartKind::Text,
1273 content: String::new(),
1274 attachment: None,
1275 tool_call_id: None,
1276 tool_name: None,
1277 tool_replay: None,
1278 prune_state: PruneState::Intact,
1279 reasoning_meta: None,
1280 response_meta: None,
1281 });
1282 }
1283 if !user_parts.is_empty() {
1284 reassign_part_ids(&user_id, &mut user_parts);
1285 turn_delta.push(Message {
1286 id: user_id.clone(),
1287 role: MessageRole::User,
1288 parts: shared_parts(user_parts),
1289 origin: None,
1290 });
1291 }
1292
1293 let manager = self
1294 .runtime_session_services_for_turn(None)
1295 .map_err(|err| {
1296 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1297 })?;
1298 let plugin_session = self
1299 .session
1300 .as_ref()
1301 .map(|s| Arc::clone(s.plugins()))
1302 .ok_or_else(|| {
1303 RuntimeError::new(
1304 RuntimeErrorCode::ContextPrepareTurn,
1305 "runtime session not available",
1306 )
1307 })?;
1308 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1309 let prepare_phase_controller = scoped_child_turn_controller(
1310 &scoped_effect_controller,
1311 &self.state.session_id,
1312 &prepare_phase_turn_id,
1313 )?;
1314 let turn_ctx = crate::TurnTransformContext {
1315 session_id: self.state.session_id.clone(),
1316 state: self.read_view(),
1317 prompt_usage: previous_prompt_usage.clone(),
1318 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1319 sessions: manager.state_service(),
1320 session_lifecycle: manager.lifecycle_service(),
1321 session_graph: manager.graph_service(),
1322 scoped_effect_controller: scoped_effect_controller.clone(),
1323 direct_completions: manager.direct_completion_client(
1324 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1325 Some(prepare_phase_turn_id),
1326 ),
1327 };
1328 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1329 let prepared_context = plugin_session
1330 .prepare_turn_context(
1331 &turn_ctx,
1332 crate::session_model::context::PreparedContext {
1333 messages: crate::MessageSequence::from_base_and_delta(
1334 base_messages,
1335 turn_delta,
1336 )
1337 .with_base_render_cache(base_render_cache),
1338 ..Default::default()
1339 },
1340 self.turn_phase_probe.clone(),
1341 )
1342 .await
1343 .map_err(|err| {
1344 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1345 })?;
1346 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1347 drop(turn_ctx);
1352 let messages = prepared_context.messages;
1353 if let Some(session) = self.session.as_mut() {
1354 session
1355 .set_context_overlay(
1356 prepared_context.tool_providers,
1357 prepared_context.prompt_contributions,
1358 prepared_context.include_base_tools,
1359 )
1360 .map_err(|err| {
1361 RuntimeError::new(
1362 RuntimeErrorCode::Other("session_tool_registry".to_string()),
1363 err.to_string(),
1364 )
1365 })?;
1366 }
1367
1368 self.state.last_prompt_usage = None;
1369 Box::pin(self.stream_prepared_turn_inner(
1370 messages,
1371 previous_prompt_usage,
1372 input.protocol_turn_options.clone(),
1373 input.protocol_extension.clone(),
1374 input.turn_context.clone(),
1375 initial_turn_causes,
1376 trace_turn_id,
1377 turn_index,
1378 events,
1379 turn_events,
1380 scoped_effect_controller,
1381 cancel,
1382 queued_claim,
1383 turn_input_claim,
1384 session_execution_lease,
1385 session_execution_lease_release_policy,
1386 ))
1387 .await
1388 }
1389
1390 pub async fn run_turn_assembled(
1392 &mut self,
1393 input: TurnInput,
1394 cancel: CancellationToken,
1395 scoped_effect_controller: ScopedEffectController<'_>,
1396 ) -> Result<AssembledTurn, RuntimeError> {
1397 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1398 .await
1399 }
1400
1401 #[allow(clippy::too_many_arguments)]
1403 pub async fn stream_prepared_turn(
1404 &mut self,
1405 messages: crate::MessageSequence,
1406 previous_prompt_usage: Option<PromptUsage>,
1407 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1408 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1409 turn_context: crate::TurnContext,
1410 initial_turn_causes: Vec<crate::TurnCause>,
1411 trace_turn_id: String,
1412 turn_index: usize,
1413 events: &dyn EventSink,
1414 turn_events: &dyn TurnActivitySink,
1415 scoped_effect_controller: ScopedEffectController<'_>,
1416 cancel: CancellationToken,
1417 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1418 initial_turn_input_claim: Option<crate::TurnInputClaim>,
1419 ) -> Result<AssembledTurn, RuntimeError> {
1420 let session_execution_lease = self
1421 .claim_session_execution_lease(cancel.clone(), true)
1422 .await?;
1423 let result = self
1424 .stream_prepared_turn_inner(
1425 messages,
1426 previous_prompt_usage,
1427 protocol_turn_options,
1428 protocol_extension,
1429 turn_context,
1430 initial_turn_causes,
1431 trace_turn_id,
1432 turn_index,
1433 events,
1434 turn_events,
1435 scoped_effect_controller,
1436 cancel,
1437 initial_queue_claim,
1438 initial_turn_input_claim,
1439 session_execution_lease.as_ref(),
1440 SessionExecutionLeaseReleasePolicy::FinalCommit,
1441 )
1442 .await;
1443 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1444 .await
1445 }
1446
1447 #[allow(clippy::too_many_arguments)]
1448 async fn stream_prepared_turn_inner(
1449 &mut self,
1450 messages: crate::MessageSequence,
1451 _previous_prompt_usage: Option<PromptUsage>,
1452 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1453 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1454 turn_context: crate::TurnContext,
1455 initial_turn_causes: Vec<crate::TurnCause>,
1456 trace_turn_id: String,
1457 turn_index: usize,
1458 events: &dyn EventSink,
1459 turn_events: &dyn TurnActivitySink,
1460 scoped_effect_controller: ScopedEffectController<'_>,
1461 cancel: CancellationToken,
1462 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1463 initial_turn_input_claim: Option<crate::TurnInputClaim>,
1464 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1465 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1466 ) -> Result<AssembledTurn, RuntimeError> {
1467 if session_execution_lease.is_none()
1468 && self
1469 .session
1470 .as_ref()
1471 .and_then(|session| session.history_store())
1472 .is_some()
1473 {
1474 return Err(RuntimeError::new(
1475 RuntimeErrorCode::StoreCommitFailed,
1476 "prepared turn requires a session execution lease",
1477 ));
1478 }
1479 let session_execution_fence =
1480 session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1481 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1482 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1483 let mut turn_policy = self.state.effective_policy().clone();
1484 let turn_provider_override = turn_context.provider().cloned();
1485 if let Some(provider) = turn_provider_override.as_ref() {
1486 turn_policy.provider_id = provider.kind().to_string();
1487 }
1488 if let Some(model) = turn_context.model_spec() {
1489 turn_policy.model = model.clone();
1490 }
1491 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1492 let effective_protocol_turn_options = protocol_turn_options
1493 .clone()
1494 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1495 .unwrap_or(session_protocol_turn_options);
1496 let manager = self
1497 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1498 .map_err(|err| {
1499 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1500 })?;
1501 let plugins = {
1502 let session = self
1503 .session
1504 .as_ref()
1505 .expect("lash runtime session must be available");
1506 Arc::clone(session.plugins())
1507 };
1508 let mut assembler = TurnAssembler::new();
1509 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1510 let prepared = {
1516 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1517 PrepareTurnRequest {
1518 session_id: self.state.session_id.clone(),
1519 state: crate::SessionReadView::from_runtime_state(
1520 &self.state,
1521 turn_policy.clone(),
1522 effective_protocol_turn_options.clone(),
1523 ),
1524 messages,
1525 sessions: manager.state_service(),
1526 session_lifecycle: manager.lifecycle_service(),
1527 session_graph: manager.graph_service(),
1528 turn_context: turn_context.clone(),
1529 },
1530 self.turn_phase_probe.clone(),
1531 );
1532 let mut prepare_turn = Box::pin(prepare_turn);
1533
1534 loop {
1535 tokio::select! {
1536 prepared = prepare_turn.as_mut() => {
1537 let prepared = prepared.map_err(|err| {
1538 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1539 })?;
1540 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1541 break prepared;
1542 }
1543 maybe_event = event_rx.recv() => {
1544 if let Some(event) = maybe_event {
1545 emit_runtime_stream_event_to_sinks(
1546 events,
1547 turn_events,
1548 event,
1549 &mut assembler,
1550 )
1551 .await;
1552 }
1553 }
1554 }
1555 }
1556 };
1557 for event in &prepared.events {
1558 assembler.push(event);
1559 }
1560 emit_session_events_to_sink(events, prepared.events).await;
1561 if let Some(abort) = prepared.abort {
1562 drop(event_tx);
1563
1564 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1565 self.state.clone(),
1566 Arc::clone(&self.host.core.clock),
1567 );
1568 turn_pipeline.apply_prepared_messages(&prepared.messages);
1569 let state = turn_pipeline.into_final_state();
1570 let issue = TurnIssue {
1571 kind: "plugin".to_string(),
1572 code: Some(abort.code),
1573 terminal_reason: None,
1574 message: abort.message.clone(),
1575 raw: None,
1576 };
1577 let error_event = SessionEvent::Error {
1578 message: abort.message,
1579 envelope: Some(crate::session_model::ErrorEnvelope {
1580 kind: "plugin".to_string(),
1581 code: issue.code.clone(),
1582 terminal_reason: None,
1583 user_message: issue.message.clone(),
1584 raw: None,
1585 }),
1586 };
1587 assembler.push(&error_event);
1588 emit_turn_activity_to_sink(
1589 turn_events,
1590 TurnActivity::independent(TurnEvent::Error {
1591 message: issue.message.clone(),
1592 }),
1593 )
1594 .await;
1595 emit_session_event_to_sink(events, error_event).await;
1596 let outcome_event = SessionEvent::TurnOutcome {
1597 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1598 };
1599 assembler.push(&outcome_event);
1600 emit_session_event_to_sink(events, outcome_event).await;
1601 assembler.push(&SessionEvent::Done);
1602 emit_session_event_to_sink(events, SessionEvent::Done).await;
1603 return Ok(assembler.finish(
1604 state.to_snapshot(),
1605 cancel.is_cancelled(),
1606 Some(issue),
1607 &self.host.core.control.termination,
1608 ));
1609 }
1610 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1611 self.state.clone(),
1612 Arc::clone(&self.host.core.clock),
1613 )
1614 .with_session_execution_lease(session_execution_fence.clone());
1615 let store = self
1616 .session
1617 .as_ref()
1618 .and_then(|session| session.history_store());
1619 let progress_store = if scoped_effect_controller.controller().durability_tier()
1623 == crate::DurabilityTier::Durable
1624 {
1625 None
1626 } else {
1627 store.as_ref().map(|store| store.as_ref())
1628 };
1629 turn_pipeline
1630 .prepared_checkpoint(
1631 progress_store,
1632 turn_policy.clone(),
1633 turn_index,
1634 &prepared.messages,
1635 self.session.as_mut(),
1636 )
1637 .await
1638 .map_err(super::runtime_error_from_store_commit)?;
1639 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1640 RuntimeSessionPolicy::from_provider(
1641 turn_policy.clone(),
1642 provider.with_clock(Arc::clone(&self.host.core.clock)),
1643 )
1644 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1645 } else {
1646 self.host
1647 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1648 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1649 };
1650 let manager = self
1651 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1652 .map_err(|err| {
1653 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1654 })?;
1655 let cancel_state = cancel.clone();
1656 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1657 let session = self
1658 .session
1659 .take()
1660 .expect("lash runtime session must be available");
1661 let mut driver = RuntimeTurnDriver {
1662 session,
1663 policy: resolved_turn_policy,
1664 host: self.host.clone(),
1665 turn_id: scoped_effect_controller.scope_id().to_string(),
1666 scoped_effect_controller,
1667 session_id: self.state.session_id.clone(),
1668 turn_index,
1669 turn_pipeline,
1670 llm_stream_summaries: HashMap::new(),
1671 next_llm_ordinal: 0,
1672 session_services: manager,
1673 protocol_turn_options: effective_protocol_turn_options,
1674 protocol_extension,
1675 turn_context,
1676 turn_causes: initial_turn_causes,
1677 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1678 pending_turn_input_claims: initial_turn_input_claim.into_iter().collect(),
1679 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1680 session_execution_lease: session_execution_fence,
1681 runtime_lease_owner: self.runtime_lease_owner.clone(),
1682 turn_phase_probe: self.turn_phase_probe.clone(),
1683 };
1684 let protocol_run_offset = 0;
1685 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1686 let run_result = drive_turn_to_completion(
1687 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1688 &mut event_rx,
1689 &mut assembler,
1690 &child_usage_event_relay,
1691 events,
1692 turn_events,
1693 )
1694 .await;
1695 let (new_messages, _new_protocol_iteration) = match run_result {
1696 Ok(result) => result,
1697 Err(err) => {
1698 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1699 let RuntimeTurnDriver {
1700 session,
1701 pending_queue_claims,
1702 pending_turn_input_claims,
1703 ..
1704 } = driver;
1705 self.session = Some(session);
1706 self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1707 .await;
1708 self.abandon_turn_input_claims_after_lease_loss(&err, &pending_turn_input_claims)
1709 .await;
1710 return Err(err);
1711 }
1712 };
1713 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1714 tracing::debug!(
1715 new_message_count = new_messages.len(),
1716 tool_call_count = assembler.tool_calls.len(),
1717 "runtime post-run_task"
1718 );
1719
1720 let RuntimeTurnDriver {
1721 session,
1722 policy,
1723 turn_pipeline,
1724 pending_queue_claims,
1725 pending_turn_input_claims,
1726 ..
1727 } = driver;
1728 self.session = Some(session);
1729 let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1730 let pending_turn_input_claims_for_abandon = pending_turn_input_claims.clone();
1731 let finish_result = self
1732 .finish_turn(
1733 TurnFinishInput {
1734 turn_pipeline,
1735 assembler,
1736 new_messages,
1737 policy,
1738 turn_index,
1739 queued_work_completions: pending_queue_claims
1740 .iter()
1741 .map(crate::QueuedWorkClaim::completion)
1742 .collect(),
1743 turn_input_completions: pending_turn_input_claims
1744 .iter()
1745 .map(crate::TurnInputClaim::completion)
1746 .collect(),
1747 trace_turn_id,
1748 },
1749 events,
1750 &finish_scoped_effect_controller,
1751 &cancel_state,
1752 session_execution_lease,
1753 session_execution_lease_release_policy,
1754 )
1755 .await;
1756 if let Err(err) = &finish_result {
1757 self.abandon_queued_work_claims_after_lease_loss(
1758 err,
1759 &pending_queue_claims_for_abandon,
1760 )
1761 .await;
1762 self.abandon_turn_input_claims_after_lease_loss(
1763 err,
1764 &pending_turn_input_claims_for_abandon,
1765 )
1766 .await;
1767 }
1768 finish_result
1769 }
1770 async fn normalize_input_items(
1771 &self,
1772 items: &[InputItem],
1773 image_blobs: &HashMap<String, Vec<u8>>,
1774 ) -> Result<Vec<NormalizedItem>, String> {
1775 normalize_input_items(
1776 items,
1777 image_blobs,
1778 self.host.core.durability.attachment_store.as_ref(),
1779 )
1780 .await
1781 }
1782}
1783
1784fn turn_input_from_text(text: String) -> TurnInput {
1785 TurnInput {
1786 items: vec![InputItem::Text { text }],
1787 image_blobs: HashMap::new(),
1788 protocol_turn_options: None,
1789 trace_turn_id: None,
1790 protocol_extension: None,
1791 turn_context: crate::TurnContext::default(),
1792 }
1793}
1794
1795fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1796 if completed_turn_count == 0 {
1797 root_turn_id.to_string()
1798 } else {
1799 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1800 }
1801}
1802
1803pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1804 if input.protocol_extension.is_some() {
1805 return Err(RuntimeError::new(
1806 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1807 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1808 ));
1809 }
1810 input
1811 .turn_context
1812 .live_plugin_inputs()
1813 .durable_effect_rejection()?;
1814 Ok(())
1815}
1816
1817async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1818 if !events.is_noop() {
1819 events.emit(activity).await;
1820 }
1821}
1822
1823async fn drive_turn_to_completion<F>(
1833 run_future: F,
1834 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1835 assembler: &mut TurnAssembler,
1836 child_usage_event_relay: &ChildUsageEventRelay,
1837 events: &dyn EventSink,
1838 turn_events: &dyn TurnActivitySink,
1839) -> Result<(crate::MessageSequence, usize), RuntimeError>
1840where
1841 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1842{
1843 let run_result = {
1844 let mut run_future = Box::pin(run_future);
1845 loop {
1846 tokio::select! {
1847 maybe_event = event_rx.recv() => {
1848 if let Some(event) = maybe_event {
1849 emit_runtime_stream_event_to_sinks(
1850 events,
1851 turn_events,
1852 event,
1853 assembler,
1854 )
1855 .await;
1856 }
1857 }
1858 completed = run_future.as_mut() => {
1859 child_usage_event_relay.clear();
1860 break completed;
1861 }
1862 }
1863 }
1864 };
1865 while let Some(event) = event_rx.recv().await {
1866 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1867 }
1868 run_result
1869}
1870
1871async fn emit_runtime_stream_event_to_sinks(
1872 events: &dyn EventSink,
1873 turn_events: &dyn TurnActivitySink,
1874 event: RuntimeStreamEvent,
1875 assembler: &mut TurnAssembler,
1876) {
1877 match event {
1878 RuntimeStreamEvent::Session(event) => {
1879 assembler.push(&event);
1880 emit_session_event_to_sink(events, event).await;
1881 }
1882 RuntimeStreamEvent::Turn(activity) => {
1883 assembler.push_turn_activity(&activity);
1884 emit_turn_activity_to_sink(turn_events, activity).await;
1885 }
1886 }
1887}
1888
1889#[cfg(test)]
1890mod tests {
1891 use super::agent_frame_follow_turn_id;
1892
1893 #[test]
1894 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1895 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1896 assert_eq!(
1897 agent_frame_follow_turn_id("root-turn", 1),
1898 "root-turn:agent-frame:1"
1899 );
1900 assert_eq!(
1901 agent_frame_follow_turn_id("root-turn", 2),
1902 "root-turn:agent-frame:2"
1903 );
1904 }
1905}