1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51#[derive(Clone, Copy)]
52enum SessionExecutionLeaseReleasePolicy {
53 FinalCommit,
54 KeepOnAgentFrameSwitch,
55}
56
57impl SessionExecutionLeaseReleasePolicy {
58 fn should_release(self, outcome: &TurnOutcome) -> bool {
59 match self {
60 Self::FinalCommit => true,
61 Self::KeepOnAgentFrameSwitch => {
62 !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
63 }
64 }
65 }
66}
67
68fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
69 match payload {
70 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
71 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
72 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
73 }
74}
75
76fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
77 claim
78 .batches
79 .iter()
80 .map(|batch| batch.batch_id.clone())
81 .collect()
82}
83
84fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
85 format!("{parent_turn_id}:{phase}")
86}
87
88fn scoped_child_turn_controller<'run>(
89 scoped_effect_controller: &'run ScopedEffectController<'_>,
90 session_id: &str,
91 turn_id: &str,
92) -> Result<ScopedEffectController<'run>, RuntimeError> {
93 ScopedEffectController::borrowed(
94 scoped_effect_controller.controller(),
95 ExecutionScope::turn(session_id, turn_id),
96 )
97}
98
99pub(in crate::runtime) fn queued_work_trace_payload(
100 boundary: crate::QueuedWorkClaimBoundary,
101 claim: &crate::QueuedWorkClaim,
102 causes: &[crate::TurnCause],
103) -> serde_json::Value {
104 serde_json::json!({
105 "boundary": boundary,
106 "claim_id": claim.claim_id,
107 "owner_id": claim.owner_id,
108 "batch_ids": queued_work_batch_ids(claim),
109 "payload_types": claim.batches.iter()
110 .flat_map(|batch| batch.items.iter())
111 .map(|item| queued_work_payload_type(&item.payload))
112 .collect::<Vec<_>>(),
113 "causes": causes,
114 })
115}
116
117pub(in crate::runtime) fn queued_work_completion_trace_payload(
118 completions: &[crate::QueuedWorkCompletion],
119) -> serde_json::Value {
120 serde_json::json!({
121 "claims": completions.iter().map(|completion| {
122 serde_json::json!({
123 "session_id": completion.session_id,
124 "claim_id": completion.claim_id,
125 "batch_ids": completion.batch_ids,
126 })
127 }).collect::<Vec<_>>(),
128 })
129}
130
131async fn emit_queued_work_started_to_sink(
132 events: &dyn TurnActivitySink,
133 boundary: crate::QueuedWorkClaimBoundary,
134 claim: &crate::QueuedWorkClaim,
135 causes: Vec<crate::TurnCause>,
136) {
137 emit_turn_activity_to_sink(
138 events,
139 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
140 boundary,
141 batch_ids: queued_work_batch_ids(claim),
142 causes,
143 }),
144 )
145 .await;
146}
147
148pub(in crate::runtime) async fn send_queued_work_started_event(
149 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
150 boundary: crate::QueuedWorkClaimBoundary,
151 claim: &crate::QueuedWorkClaim,
152 causes: Vec<crate::TurnCause>,
153) {
154 send_turn_activity(
155 event_tx,
156 TurnActivityId::fresh(),
157 TurnEvent::QueuedWorkStarted {
158 boundary,
159 batch_ids: queued_work_batch_ids(claim),
160 causes,
161 },
162 )
163 .await;
164}
165
166struct TurnFinishInput {
167 turn_pipeline: TurnBoundary,
168 assembler: TurnAssembler,
169 new_messages: crate::MessageSequence,
170 policy: RuntimeSessionPolicy,
171 turn_index: usize,
172 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
173 trace_turn_id: String,
174}
175
176impl LashRuntime {
177 fn max_context_tokens(&self) -> usize {
178 self.state.effective_policy().context_window_tokens()
179 }
180
181 async fn claim_session_execution_lease(
182 &self,
183 cancel: CancellationToken,
184 busy_is_error: bool,
185 ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
186 let Some(store) = self
187 .session
188 .as_ref()
189 .and_then(|session| session.history_store())
190 else {
191 return Ok(None);
192 };
193 match SessionExecutionLeaseGuard::try_acquire(
194 store,
195 &self.state.session_id,
196 &self.runtime_scope_id,
197 Arc::clone(&self.host.core.clock),
198 cancel,
199 )
200 .await
201 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
202 {
203 Some(lease) => Ok(Some(lease)),
204 None if busy_is_error => Err(RuntimeError::new(
205 RuntimeErrorCode::SessionExecutionBusy,
206 format!(
207 "session `{}` is already executing on another runtime owner",
208 self.state.session_id
209 ),
210 )),
211 None => Ok(None),
212 }
213 }
214
215 async fn settle_session_execution_lease<T>(
216 &self,
217 guard: Option<&SessionExecutionLeaseGuard>,
218 result: Result<T, RuntimeError>,
219 ) -> Result<T, RuntimeError> {
220 match result {
221 Ok(value) => {
222 if let Some(guard) = guard {
223 guard.release_if_live().await.map_err(|err| {
224 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
225 })?;
226 }
227 Ok(value)
228 }
229 Err(err) => {
230 if err.code != RuntimeErrorCode::StoreCommitFailed
231 && let Some(guard) = guard
232 && let Err(release_err) = guard.release_if_live().await
233 {
234 tracing::warn!(
235 error = %release_err,
236 "failed to release session execution lease after runtime error"
237 );
238 }
239 Err(err)
240 }
241 }
242 }
243
244 async fn ensure_session_execution_lease_live(
245 &self,
246 guard: Option<&SessionExecutionLeaseGuard>,
247 ) -> Result<(), RuntimeError> {
248 let Some(guard) = guard else {
249 return Ok(());
250 };
251 guard.refresh_or_mark_lost().await.map_err(|err| {
252 RuntimeError::new(
253 RuntimeErrorCode::SessionExecutionLeaseLost,
254 format!(
255 "session execution lease for session `{}` was lost before commit: {err}",
256 self.state.session_id
257 ),
258 )
259 })
260 }
261
262 async fn abandon_queued_work_claims_after_lease_loss(
263 &self,
264 err: &RuntimeError,
265 claims: &[crate::QueuedWorkClaim],
266 ) {
267 if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
268 return;
269 }
270 let Some(store) = self
271 .session
272 .as_ref()
273 .and_then(|session| session.history_store())
274 else {
275 return;
276 };
277 for claim in claims {
278 if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
279 tracing::warn!(
280 error = %abandon_err,
281 session_id = %claim.session_id,
282 claim_id = %claim.claim_id,
283 "failed to abandon queued work claim after session execution lease loss"
284 );
285 }
286 }
287 }
288
289 #[doc(hidden)]
290 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
291 self.turn_phase_probe = Some(probe);
292 }
293
294 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
295 if let Some(probe) = self.turn_phase_probe.as_ref() {
296 probe.begin(phase);
297 }
298 }
299
300 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
301 if let Some(probe) = self.turn_phase_probe.as_ref() {
302 probe.end(phase);
303 }
304 }
305
306 async fn finish_turn(
307 &mut self,
308 finish: TurnFinishInput,
309 events: &dyn EventSink,
310 scoped_effect_controller: &ScopedEffectController<'_>,
311 cancel_state: &CancellationToken,
312 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
313 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
314 ) -> Result<AssembledTurn, RuntimeError> {
315 let TurnFinishInput {
316 mut turn_pipeline,
317 assembler,
318 new_messages,
319 policy,
320 turn_index,
321 queued_work_completions,
322 trace_turn_id,
323 } = finish;
324 self.policy = self.state.effective_policy().clone();
325 turn_pipeline.state_mut().policy = self.policy.clone();
326 turn_pipeline.state_mut().turn_index = turn_index;
327
328 let mut turn_usage_delta = {
329 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
330 std::mem::take(&mut *ledger)
331 };
332 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
333 turn_usage_delta.push(TokenLedgerEntry {
334 source: "turn".to_string(),
335 model: policy.model.id.clone(),
336 usage: assembler.token_usage.clone(),
337 });
338 }
339 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
340
341 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
342 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
343 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
344 }
345
346 let last_prompt_usage = assembler
347 .last_llm_usage()
348 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
349 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
350 let assembled_state = turn_pipeline.export_state_for_assembly();
351 let assembled = assembler.finish(
352 assembled_state,
353 cancel_state.is_cancelled(),
354 None,
355 &self.host.core.control.termination,
356 );
357
358 let Some(session) = self.session.as_ref() else {
359 self.state.apply_snapshot(&assembled.state);
360 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
361 return Ok(assembled);
362 };
363 self.ensure_session_execution_lease_live(session_execution_lease)
364 .await?;
365
366 let plugins = Arc::clone(session.plugins());
367 let manager = match self.runtime_session_services_for_turn(None) {
368 Ok(manager) => manager,
369 Err(err) => {
370 return Err(RuntimeError::new(
371 RuntimeErrorCode::PluginSessionManager,
372 err.to_string(),
373 ));
374 }
375 };
376
377 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
378 let finalized = match plugins
379 .finalize_turn_with_phase_probe(
380 assembled,
381 manager.state_service(),
382 manager.lifecycle_service(),
383 manager.graph_service(),
384 self.turn_phase_probe.clone(),
385 )
386 .await
387 {
388 Ok(finalized) => finalized,
389 Err(err) => {
390 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
391 return Err(RuntimeError::new(
392 RuntimeErrorCode::PluginFinalizeTurn,
393 err.to_string(),
394 ));
395 }
396 };
397 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
398 self.ensure_session_execution_lease_live(session_execution_lease)
399 .await?;
400
401 let mut returned_turn = finalized.turn;
402 let release_session_execution_lease =
403 session_execution_lease_release_policy.should_release(&returned_turn.outcome);
404 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
405 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
406 let queued_work_completion_trace = queued_work_completions.clone();
407 let pending_attachment_ids = self
408 .host
409 .core
410 .durability
411 .attachment_store
412 .pending_manifest_commit_ids();
413 if let Err(err) = turn_pipeline
414 .final_commit(
415 &mut returned_turn,
416 self.session.as_mut(),
417 &turn_usage_delta,
418 Some(&trace_turn_id),
419 queued_work_completions,
420 pending_attachment_ids.clone(),
421 release_session_execution_lease
422 .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
423 .flatten(),
424 )
425 .await
426 {
427 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
428 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
429 return Err(err);
430 }
431 if release_session_execution_lease && let Some(lease) = session_execution_lease {
432 lease.mark_released();
433 }
434 self.host
435 .core
436 .durability
437 .attachment_store
438 .mark_manifest_committed(&pending_attachment_ids);
439 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
440
441 emit_session_events_to_sink(events, finalized.events).await;
442 self.state = turn_pipeline.into_final_state();
443 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
444 && let Some(session) = self.session.as_mut()
445 {
446 let protocol_session = Arc::clone(session.plugins().protocol_session());
447 let session_id = self.state.session_id.clone();
448 protocol_session
449 .restore_session(
450 crate::plugin::ProtocolSessionContext::new(session, &session_id),
451 &self.state,
452 )
453 .await
454 .map_err(|err| {
455 RuntimeError::new(
456 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
457 err.to_string(),
458 )
459 })?;
460 }
461 if !queued_work_completion_trace.is_empty() {
462 crate::trace::emit_trace(
463 &self.host.core.tracing.trace_sink,
464 &self.host.core.tracing.trace_context,
465 lash_trace::TraceContext::default()
466 .for_session(returned_turn.state.session_id.clone())
467 .for_turn_index(returned_turn.state.turn_index)
468 .for_turn(trace_turn_id.clone()),
469 lash_trace::TraceEvent::Custom {
470 name: "queued_work.completed".to_string(),
471 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
472 },
473 self.host.core.clock.as_ref(),
474 );
475 }
476 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
477 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
478 .await?;
479 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
480 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
481
482 self.emit_completed_turn_trace(
483 &returned_turn.state,
484 &returned_turn.outcome,
485 &trace_turn_id,
486 );
487 Ok(returned_turn)
488 }
489
490 fn emit_completed_turn_trace(
491 &self,
492 state: &SessionSnapshot,
493 outcome: &TurnOutcome,
494 trace_turn_id: &str,
495 ) {
496 if self.host.core.tracing.trace_sink.is_none() {
497 return;
498 }
499
500 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
501 crate::trace::emit_trace(
502 &self.host.core.tracing.trace_sink,
503 &self.host.core.tracing.trace_context,
504 lash_trace::TraceContext::default()
505 .for_session(state.session_id.clone())
506 .for_turn_index(state.turn_index)
507 .for_turn(trace_turn_id.to_string()),
508 lash_trace::TraceEvent::TurnCompleted {
509 status: status.to_string(),
510 done_reason: done_reason.to_string(),
511 agent_frame_switch,
512 },
513 self.host.core.clock.as_ref(),
514 );
515 }
516
517 async fn emit_turn_persisted_event(
518 &self,
519 returned_turn: &AssembledTurn,
520 scoped_effect_controller: &ScopedEffectController<'_>,
521 trace_turn_id: &str,
522 ) -> Result<(), RuntimeError> {
523 let Some(session) = self.session.as_ref() else {
524 return Ok(());
525 };
526 let Ok(manager) = self.runtime_session_services() else {
527 return Ok(());
528 };
529 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
530 let phase_controller = scoped_child_turn_controller(
531 scoped_effect_controller,
532 &self.state.session_id,
533 &phase_turn_id,
534 )?;
535 let direct_completions = manager.direct_completion_client(
536 RuntimeEffectControllerHandle::borrowed(phase_controller),
537 Some(phase_turn_id),
538 );
539
540 session
541 .plugins()
542 .emit_runtime_event_with_phase_probe(
543 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
544 crate::SessionStateChangedContext {
545 session_id: self.state.session_id.clone(),
546 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
547 sessions: manager.state_service(),
548 session_graph: manager.graph_service(),
549 direct_completions,
550 },
551 )),
552 self.turn_phase_probe.clone(),
553 )
554 .await;
555 Ok(())
556 }
557
558 pub async fn stream_turn(
560 &mut self,
561 input: TurnInput,
562 opts: TurnOptions<'_>,
563 ) -> Result<AssembledTurn, RuntimeError> {
564 let cancel = opts.cancel.clone();
565 let session_execution_lease = self
566 .claim_session_execution_lease(cancel.clone(), true)
567 .await?;
568 let scoped_effect_controller = opts.scoped_effect_controller();
569 let result = self
570 .stream_turn_with_scoped_effect_controller_inner(
571 input,
572 opts.events_or_noop(),
573 opts.turn_events_or_noop(),
574 scoped_effect_controller,
575 cancel,
576 None,
577 session_execution_lease.as_ref(),
578 SessionExecutionLeaseReleasePolicy::FinalCommit,
579 )
580 .await;
581 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
582 .await
583 }
584
585 pub async fn stream_next_queued_work(
586 &mut self,
587 opts: TurnOptions<'_>,
588 ) -> Result<Option<AssembledTurn>, RuntimeError> {
589 self.stream_queued_work(opts, None).await
590 }
591
592 pub async fn stream_selected_queued_work(
593 &mut self,
594 opts: TurnOptions<'_>,
595 batch_ids: &[String],
596 ) -> Result<Option<AssembledTurn>, RuntimeError> {
597 self.stream_queued_work(opts, Some(batch_ids)).await
598 }
599
600 async fn stream_queued_work(
601 &mut self,
602 opts: TurnOptions<'_>,
603 selected_batch_ids: Option<&[String]>,
604 ) -> Result<Option<AssembledTurn>, RuntimeError> {
605 let cancel = opts.cancel.clone();
606 let Some(session_execution_lease) = self
607 .claim_session_execution_lease(cancel.clone(), false)
608 .await?
609 else {
610 return Ok(None);
611 };
612 let session_execution_fence = session_execution_lease.fence();
613 if self
614 .drain_next_session_command(&session_execution_fence)
615 .await?
616 .is_some()
617 {
618 session_execution_lease
619 .release_if_live()
620 .await
621 .map_err(|err| {
622 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
623 })?;
624 return Ok(None);
625 }
626 let Some(store) = self
627 .session
628 .as_ref()
629 .and_then(|session| session.history_store())
630 else {
631 return Ok(None);
632 };
633 let claim = if let Some(batch_ids) = selected_batch_ids {
634 store
635 .claim_ready_queued_work_by_batch_ids(
636 &self.state.session_id,
637 &session_execution_fence,
638 &self.runtime_scope_id,
639 crate::QueuedWorkClaimBoundary::Idle,
640 crate::QUEUED_WORK_CLAIM_TTL_MS,
641 batch_ids,
642 )
643 .await
644 } else {
645 store
646 .claim_ready_queued_work(
647 &self.state.session_id,
648 &session_execution_fence,
649 &self.runtime_scope_id,
650 crate::QueuedWorkClaimBoundary::Idle,
651 crate::QUEUED_WORK_CLAIM_TTL_MS,
652 64,
653 )
654 .await
655 }
656 .map_err(super::runtime_error_from_store_commit)?;
657 let Some(claim) = claim else {
658 session_execution_lease
659 .release_if_live()
660 .await
661 .map_err(|err| {
662 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
663 })?;
664 return Ok(None);
665 };
666 let mut work = claim.materialize_for_turn();
667 let turn_id = work
668 .input
669 .trace_turn_id
670 .clone()
671 .or_else(|| Some(opts.execution_scope_id().to_owned()))
672 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
673 work.input.trace_turn_id = Some(turn_id.clone());
674 let causes = work.turn_causes.clone();
675 emit_queued_work_started_to_sink(
676 opts.turn_events_or_noop(),
677 crate::QueuedWorkClaimBoundary::Idle,
678 &claim,
679 causes.clone(),
680 )
681 .await;
682 crate::trace::emit_trace(
683 &self.host.core.tracing.trace_sink,
684 &self.host.core.tracing.trace_context,
685 lash_trace::TraceContext::default()
686 .for_session(self.state.session_id.clone())
687 .for_turn_index(self.state.turn_index + 1)
688 .for_turn(turn_id.clone()),
689 lash_trace::TraceEvent::Custom {
690 name: "queued_work.claimed".to_string(),
691 payload: queued_work_trace_payload(
692 crate::QueuedWorkClaimBoundary::Idle,
693 &claim,
694 &causes,
695 ),
696 },
697 self.host.core.clock.as_ref(),
698 );
699 let claim_for_abandon = claim.clone();
700 let scoped_effect_controller = opts.scoped_effect_controller();
701 let result = self
702 .stream_turn_with_scoped_effect_controller_inner(
703 work.input,
704 opts.events_or_noop(),
705 opts.turn_events_or_noop(),
706 scoped_effect_controller,
707 cancel,
708 Some(claim),
709 Some(&session_execution_lease),
710 SessionExecutionLeaseReleasePolicy::FinalCommit,
711 )
712 .await
713 .map(Some);
714 if let Err(err) = &result {
715 self.abandon_queued_work_claims_after_lease_loss(
716 err,
717 std::slice::from_ref(&claim_for_abandon),
718 )
719 .await;
720 }
721 self.settle_session_execution_lease(Some(&session_execution_lease), result)
722 .await
723 }
724
725 fn ensure_durable_store_facets_for_scope(
733 &self,
734 scoped_effect_controller: &ScopedEffectController<'_>,
735 ) -> Result<(), RuntimeError> {
736 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
737 {
738 return Ok(());
739 }
740 if self
741 .host
742 .core
743 .durability
744 .attachment_store
745 .persistence()
746 .durability_tier()
747 != crate::DurabilityTier::Durable
748 {
749 return Err(RuntimeError::durable_store_required(
750 crate::DurableStoreFacet::AttachmentStore,
751 ));
752 }
753 if self
754 .host
755 .core
756 .durability
757 .process_env_store
758 .durability_tier()
759 != crate::DurabilityTier::Durable
760 {
761 return Err(RuntimeError::durable_store_required(
762 crate::DurableStoreFacet::ProcessEnvStore,
763 ));
764 }
765 if let Some(store) = self
766 .session
767 .as_ref()
768 .and_then(|session| session.history_store())
769 && store.durability_tier() != crate::DurabilityTier::Durable
770 {
771 return Err(RuntimeError::durable_store_required(
772 crate::DurableStoreFacet::SessionStore,
773 ));
774 }
775 if let Some(process_registry) = self.host.process_registry.as_ref()
776 && process_registry.durability_tier() != crate::DurabilityTier::Durable
777 {
778 return Err(RuntimeError::durable_store_required(
779 crate::DurableStoreFacet::ProcessRegistry,
780 ));
781 }
782 Ok(())
783 }
784
785 #[allow(clippy::too_many_arguments)]
786 async fn stream_turn_with_scoped_effect_controller_inner(
787 &mut self,
788 mut input: TurnInput,
789 events: &dyn EventSink,
790 turn_events: &dyn TurnActivitySink,
791 scoped_effect_controller: ScopedEffectController<'_>,
792 cancel: CancellationToken,
793 queued_claim: Option<crate::QueuedWorkClaim>,
794 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
795 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
796 ) -> Result<AssembledTurn, RuntimeError> {
797 if queued_claim.is_none() {
798 if let Some(lease) = session_execution_lease {
799 while self
800 .drain_next_session_command(&lease.fence())
801 .await?
802 .is_some()
803 {}
804 } else if self
805 .session
806 .as_ref()
807 .and_then(|session| session.history_store())
808 .is_some()
809 {
810 return Err(RuntimeError::new(
811 RuntimeErrorCode::StoreCommitFailed,
812 "session command drain requires a session execution lease",
813 ));
814 }
815 }
816 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
817 && scoped_effect_controller
818 .execution_scope()
819 .validates_turn_trace_id()
820 && input_turn_id != scoped_effect_controller.scope_id()
821 {
822 return Err(RuntimeError::new(
823 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
824 format!(
825 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
826 scoped_effect_controller.scope_id()
827 ),
828 ));
829 }
830 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
831 input
832 .trace_turn_id
833 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
834 self.stream_turn_inner(
835 input.clone(),
836 events,
837 turn_events,
838 scoped_effect_controller,
839 cancel.clone(),
840 queued_claim,
841 session_execution_lease,
842 session_execution_lease_release_policy,
843 )
844 .await
845 }
846
847 pub async fn stream_turn_with_agent_frames(
856 &mut self,
857 input: TurnInput,
858 opts: TurnOptions<'_>,
859 ) -> Result<AgentFrameRun, RuntimeError> {
860 let cancel = opts.cancel.clone();
861 let session_execution_lease = self
862 .claim_session_execution_lease(cancel.clone(), true)
863 .await?;
864 let scoped_effect_controller = opts.scoped_effect_controller();
865 let result = self
866 .stream_turn_with_agent_frames_inner(
867 input,
868 opts.events_or_noop(),
869 opts.turn_events_or_noop(),
870 scoped_effect_controller,
871 cancel,
872 session_execution_lease.as_ref(),
873 )
874 .await;
875 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
876 .await
877 }
878
879 async fn stream_turn_with_agent_frames_inner(
880 &mut self,
881 mut input: TurnInput,
882 events: &dyn EventSink,
883 turn_events: &dyn TurnActivitySink,
884 scoped_effect_controller: ScopedEffectController<'_>,
885 cancel: CancellationToken,
886 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
887 ) -> Result<AgentFrameRun, RuntimeError> {
888 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
889 && scoped_effect_controller
890 .execution_scope()
891 .validates_turn_trace_id()
892 && input_turn_id != scoped_effect_controller.scope_id()
893 {
894 return Err(RuntimeError::new(
895 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
896 format!(
897 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
898 scoped_effect_controller.scope_id()
899 ),
900 ));
901 }
902 let follow_protocol_turn_options = input.protocol_turn_options.clone();
903 let follow_turn_context = input.turn_context.clone();
904 let follow_trace_turn_id = input
905 .trace_turn_id
906 .clone()
907 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
908 input
909 .trace_turn_id
910 .get_or_insert(follow_trace_turn_id.clone());
911 let mut turns = Vec::new();
912 loop {
913 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
914 input.trace_turn_id = Some(turn_trace_turn_id.clone());
915 let turn_effect_controller = if turns.is_empty() {
916 scoped_effect_controller.clone()
917 } else {
918 ScopedEffectController::borrowed(
919 scoped_effect_controller.controller(),
920 ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
921 )?
922 };
923 let turn = self
924 .stream_turn_with_scoped_effect_controller_inner(
925 input,
926 events,
927 turn_events,
928 turn_effect_controller,
929 cancel.clone(),
930 None,
931 session_execution_lease,
932 SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
933 )
934 .await?;
935 let switched_frame = match &turn.outcome {
936 TurnOutcome::AgentFrameSwitch { frame_id, task } => {
937 Some((frame_id.clone(), task.clone()))
938 }
939 _ => None,
940 };
941 turns.push(turn);
942
943 let Some((_frame_id, task)) = switched_frame else {
944 return Ok(AgentFrameRun { turns });
945 };
946 input = turn_input_from_text(task);
947 input.protocol_turn_options = follow_protocol_turn_options.clone();
948 input.turn_context = follow_turn_context.clone();
949 }
950 }
951
952 #[allow(clippy::too_many_arguments)]
953 async fn stream_turn_inner(
954 &mut self,
955 mut input: TurnInput,
956 events: &dyn EventSink,
957 turn_events: &dyn TurnActivitySink,
958 scoped_effect_controller: ScopedEffectController<'_>,
959 cancel: CancellationToken,
960 queued_claim: Option<crate::QueuedWorkClaim>,
961 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
962 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
963 ) -> Result<AssembledTurn, RuntimeError> {
964 self.refresh_session_graph_from_store()
965 .await
966 .map_err(session_head_refresh_error)?;
967 let input_trace_turn_id = input.trace_turn_id.clone();
968 let queued_turn_work = queued_claim
969 .as_ref()
970 .map(crate::QueuedWorkClaim::materialize_for_turn);
971 if let Some(work) = queued_turn_work.as_ref()
972 && input.items.is_empty()
973 && input.image_blobs.is_empty()
974 {
975 input = work.input.clone();
976 if input.trace_turn_id.is_none() {
977 input.trace_turn_id = input_trace_turn_id;
978 }
979 }
980 if self
981 .session
982 .as_ref()
983 .and_then(|session| session.history_store())
984 .is_some()
985 {
986 ensure_durable_effect_input(&input)?;
987 }
988 if let Some(extension) = &input.protocol_extension
989 && let Some(session) = self.session.as_ref()
990 {
991 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
992 protocol_session
993 .validate_turn_extension(extension)
994 .await
995 .map_err(|err| {
996 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
997 })?;
998 }
999 let previous_prompt_usage = self.state.last_prompt_usage.clone();
1000 let normalized = match self
1001 .normalize_input_items(&input.items, &input.image_blobs)
1002 .await
1003 {
1004 Ok(items) => items,
1005 Err(e) => {
1006 self.state.last_prompt_usage = None;
1007 let mut assembler = TurnAssembler::default();
1008 let error_event = SessionEvent::Error {
1009 message: e.clone(),
1010 envelope: Some(crate::session_model::ErrorEnvelope {
1011 kind: "input_validation".to_string(),
1012 code: Some("invalid_turn_input".to_string()),
1013 terminal_reason: None,
1014 user_message: e.clone(),
1015 raw: None,
1016 }),
1017 };
1018 assembler.push(&error_event);
1019 emit_turn_activity_to_sink(
1020 turn_events,
1021 TurnActivity::independent(TurnEvent::Error { message: e }),
1022 )
1023 .await;
1024 emit_session_event_to_sink(events, error_event).await;
1025 let outcome_event = SessionEvent::TurnOutcome {
1026 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1027 };
1028 assembler.push(&outcome_event);
1029 emit_session_event_to_sink(events, outcome_event).await;
1030 assembler.push(&SessionEvent::Done);
1031 emit_session_event_to_sink(events, SessionEvent::Done).await;
1032 return Ok(assembler.finish(
1033 self.state.to_snapshot(),
1034 false,
1035 None,
1036 &self.host.core.control.termination,
1037 ));
1038 }
1039 };
1040 let turn_index = self.state.turn_index + 1;
1041 let trace_turn_id = input
1042 .trace_turn_id
1043 .clone()
1044 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1045 if self.host.core.tracing.trace_sink.is_some() {
1046 let mut trace_metadata = std::collections::BTreeMap::new();
1047 trace_metadata.insert(
1048 "input_item_count".to_string(),
1049 serde_json::json!(normalized.len()),
1050 );
1051 crate::trace::emit_trace(
1052 &self.host.core.tracing.trace_sink,
1053 &self.host.core.tracing.trace_context,
1054 lash_trace::TraceContext::default()
1055 .for_session(self.state.session_id.clone())
1056 .for_turn_index(turn_index)
1057 .for_turn(trace_turn_id.clone()),
1058 lash_trace::TraceEvent::TurnStarted {
1059 metadata: trace_metadata,
1060 },
1061 self.host.core.clock.as_ref(),
1062 );
1063 }
1064
1065 let base_read_model = self.state.read_model();
1066 let base_messages = base_read_model.messages;
1067 let base_render_cache = base_read_model.prompt_render_cache;
1068 let mut turn_delta = Vec::new();
1069 let initial_turn_causes = queued_turn_work
1070 .as_ref()
1071 .map(|work| work.turn_causes.clone())
1072 .unwrap_or_default();
1073 turn_delta.extend(
1074 initial_turn_causes
1075 .iter()
1076 .map(crate::TurnCause::to_event_message),
1077 );
1078
1079 let user_id = fresh_message_id();
1080 let mut user_parts: Vec<Part> = Vec::new();
1081 for item in normalized {
1082 match item {
1083 NormalizedItem::Text(text) => {
1084 if text.is_empty() {
1085 continue;
1086 }
1087 user_parts.push(Part {
1088 id: format!("{}.p{}", user_id, user_parts.len()),
1089 kind: PartKind::Text,
1090 content: text,
1091 attachment: None,
1092 tool_call_id: None,
1093 tool_name: None,
1094 tool_replay: None,
1095 prune_state: PruneState::Intact,
1096 reasoning_meta: None,
1097 response_meta: None,
1098 });
1099 }
1100 NormalizedItem::Image(reference) => {
1101 user_parts.push(Part {
1102 id: format!("{}.p{}", user_id, user_parts.len()),
1103 kind: PartKind::Image,
1104 content: String::new(),
1105 attachment: Some(crate::session_model::message::PartAttachment {
1106 reference,
1107 }),
1108 tool_call_id: None,
1109 tool_name: None,
1110 tool_replay: None,
1111 prune_state: PruneState::Intact,
1112 reasoning_meta: None,
1113 response_meta: None,
1114 });
1115 }
1116 }
1117 }
1118 if user_parts.is_empty() && initial_turn_causes.is_empty() {
1119 user_parts.push(Part {
1120 id: format!("{}.p0", user_id),
1121 kind: PartKind::Text,
1122 content: String::new(),
1123 attachment: None,
1124 tool_call_id: None,
1125 tool_name: None,
1126 tool_replay: None,
1127 prune_state: PruneState::Intact,
1128 reasoning_meta: None,
1129 response_meta: None,
1130 });
1131 }
1132 if !user_parts.is_empty() {
1133 reassign_part_ids(&user_id, &mut user_parts);
1134 turn_delta.push(Message {
1135 id: user_id.clone(),
1136 role: MessageRole::User,
1137 parts: shared_parts(user_parts),
1138 origin: None,
1139 });
1140 }
1141
1142 let manager = self
1143 .runtime_session_services_for_turn(None)
1144 .map_err(|err| {
1145 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1146 })?;
1147 let plugin_session = self
1148 .session
1149 .as_ref()
1150 .map(|s| Arc::clone(s.plugins()))
1151 .ok_or_else(|| {
1152 RuntimeError::new(
1153 RuntimeErrorCode::ContextPrepareTurn,
1154 "runtime session not available",
1155 )
1156 })?;
1157 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1158 let prepare_phase_controller = scoped_child_turn_controller(
1159 &scoped_effect_controller,
1160 &self.state.session_id,
1161 &prepare_phase_turn_id,
1162 )?;
1163 let turn_ctx = crate::TurnTransformContext {
1164 session_id: self.state.session_id.clone(),
1165 state: self.read_view(),
1166 prompt_usage: previous_prompt_usage.clone(),
1167 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1168 sessions: manager.state_service(),
1169 session_lifecycle: manager.lifecycle_service(),
1170 session_graph: manager.graph_service(),
1171 scoped_effect_controller: scoped_effect_controller.clone(),
1172 direct_completions: manager.direct_completion_client(
1173 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1174 Some(prepare_phase_turn_id),
1175 ),
1176 };
1177 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1178 let prepared_context = plugin_session
1179 .prepare_turn_context(
1180 &turn_ctx,
1181 crate::session_model::context::PreparedContext {
1182 messages: crate::MessageSequence::from_base_and_delta(
1183 base_messages,
1184 turn_delta,
1185 )
1186 .with_base_render_cache(base_render_cache),
1187 ..Default::default()
1188 },
1189 self.turn_phase_probe.clone(),
1190 )
1191 .await
1192 .map_err(|err| {
1193 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1194 })?;
1195 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1196 drop(turn_ctx);
1201 let messages = prepared_context.messages;
1202 if let Some(session) = self.session.as_mut() {
1203 session
1204 .set_context_overlay(
1205 prepared_context.tool_providers,
1206 prepared_context.prompt_contributions,
1207 prepared_context.include_base_tools,
1208 )
1209 .map_err(|err| {
1210 RuntimeError::new(
1211 RuntimeErrorCode::Other("session_tool_registry".to_string()),
1212 err.to_string(),
1213 )
1214 })?;
1215 }
1216
1217 self.state.last_prompt_usage = None;
1218 Box::pin(self.stream_prepared_turn_inner(
1219 messages,
1220 previous_prompt_usage,
1221 input.protocol_turn_options.clone(),
1222 input.protocol_extension.clone(),
1223 input.turn_context.clone(),
1224 initial_turn_causes,
1225 trace_turn_id,
1226 turn_index,
1227 events,
1228 turn_events,
1229 scoped_effect_controller,
1230 cancel,
1231 queued_claim,
1232 session_execution_lease,
1233 session_execution_lease_release_policy,
1234 ))
1235 .await
1236 }
1237
1238 pub async fn run_turn_assembled(
1240 &mut self,
1241 input: TurnInput,
1242 cancel: CancellationToken,
1243 scoped_effect_controller: ScopedEffectController<'_>,
1244 ) -> Result<AssembledTurn, RuntimeError> {
1245 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1246 .await
1247 }
1248
1249 #[allow(clippy::too_many_arguments)]
1251 pub async fn stream_prepared_turn(
1252 &mut self,
1253 messages: crate::MessageSequence,
1254 previous_prompt_usage: Option<PromptUsage>,
1255 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1256 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1257 turn_context: crate::TurnContext,
1258 initial_turn_causes: Vec<crate::TurnCause>,
1259 trace_turn_id: String,
1260 turn_index: usize,
1261 events: &dyn EventSink,
1262 turn_events: &dyn TurnActivitySink,
1263 scoped_effect_controller: ScopedEffectController<'_>,
1264 cancel: CancellationToken,
1265 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1266 ) -> Result<AssembledTurn, RuntimeError> {
1267 let session_execution_lease = self
1268 .claim_session_execution_lease(cancel.clone(), true)
1269 .await?;
1270 let result = self
1271 .stream_prepared_turn_inner(
1272 messages,
1273 previous_prompt_usage,
1274 protocol_turn_options,
1275 protocol_extension,
1276 turn_context,
1277 initial_turn_causes,
1278 trace_turn_id,
1279 turn_index,
1280 events,
1281 turn_events,
1282 scoped_effect_controller,
1283 cancel,
1284 initial_queue_claim,
1285 session_execution_lease.as_ref(),
1286 SessionExecutionLeaseReleasePolicy::FinalCommit,
1287 )
1288 .await;
1289 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1290 .await
1291 }
1292
1293 #[allow(clippy::too_many_arguments)]
1294 async fn stream_prepared_turn_inner(
1295 &mut self,
1296 messages: crate::MessageSequence,
1297 _previous_prompt_usage: Option<PromptUsage>,
1298 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1299 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1300 turn_context: crate::TurnContext,
1301 initial_turn_causes: Vec<crate::TurnCause>,
1302 trace_turn_id: String,
1303 turn_index: usize,
1304 events: &dyn EventSink,
1305 turn_events: &dyn TurnActivitySink,
1306 scoped_effect_controller: ScopedEffectController<'_>,
1307 cancel: CancellationToken,
1308 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1309 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1310 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1311 ) -> Result<AssembledTurn, RuntimeError> {
1312 if session_execution_lease.is_none()
1313 && self
1314 .session
1315 .as_ref()
1316 .and_then(|session| session.history_store())
1317 .is_some()
1318 {
1319 return Err(RuntimeError::new(
1320 RuntimeErrorCode::StoreCommitFailed,
1321 "prepared turn requires a session execution lease",
1322 ));
1323 }
1324 let session_execution_fence =
1325 session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1326 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1327 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1328 let mut turn_policy = self.state.effective_policy().clone();
1329 let turn_provider_override = turn_context.provider().cloned();
1330 if let Some(provider) = turn_provider_override.as_ref() {
1331 turn_policy.provider_id = provider.kind().to_string();
1332 }
1333 if let Some(model) = turn_context.model_spec() {
1334 turn_policy.model = model.clone();
1335 }
1336 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1337 let effective_protocol_turn_options = protocol_turn_options
1338 .clone()
1339 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1340 .unwrap_or(session_protocol_turn_options);
1341 let manager = self
1342 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1343 .map_err(|err| {
1344 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1345 })?;
1346 let plugins = {
1347 let session = self
1348 .session
1349 .as_ref()
1350 .expect("lash runtime session must be available");
1351 Arc::clone(session.plugins())
1352 };
1353 let mut assembler = TurnAssembler::new();
1354 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1355 let prepared = {
1361 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1362 PrepareTurnRequest {
1363 session_id: self.state.session_id.clone(),
1364 state: crate::SessionReadView::from_runtime_state(
1365 &self.state,
1366 turn_policy.clone(),
1367 effective_protocol_turn_options.clone(),
1368 ),
1369 messages,
1370 sessions: manager.state_service(),
1371 session_lifecycle: manager.lifecycle_service(),
1372 session_graph: manager.graph_service(),
1373 turn_context: turn_context.clone(),
1374 },
1375 self.turn_phase_probe.clone(),
1376 );
1377 let mut prepare_turn = Box::pin(prepare_turn);
1378
1379 loop {
1380 tokio::select! {
1381 prepared = prepare_turn.as_mut() => {
1382 let prepared = prepared.map_err(|err| {
1383 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1384 })?;
1385 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1386 break prepared;
1387 }
1388 maybe_event = event_rx.recv() => {
1389 if let Some(event) = maybe_event {
1390 emit_runtime_stream_event_to_sinks(
1391 events,
1392 turn_events,
1393 event,
1394 &mut assembler,
1395 )
1396 .await;
1397 }
1398 }
1399 }
1400 }
1401 };
1402 for event in &prepared.events {
1403 assembler.push(event);
1404 }
1405 emit_session_events_to_sink(events, prepared.events).await;
1406 if let Some(abort) = prepared.abort {
1407 drop(event_tx);
1408
1409 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1410 self.state.clone(),
1411 Arc::clone(&self.host.core.clock),
1412 );
1413 turn_pipeline.apply_prepared_messages(&prepared.messages);
1414 let state = turn_pipeline.into_final_state();
1415 let issue = TurnIssue {
1416 kind: "plugin".to_string(),
1417 code: Some(abort.code),
1418 terminal_reason: None,
1419 message: abort.message.clone(),
1420 raw: None,
1421 };
1422 let error_event = SessionEvent::Error {
1423 message: abort.message,
1424 envelope: Some(crate::session_model::ErrorEnvelope {
1425 kind: "plugin".to_string(),
1426 code: issue.code.clone(),
1427 terminal_reason: None,
1428 user_message: issue.message.clone(),
1429 raw: None,
1430 }),
1431 };
1432 assembler.push(&error_event);
1433 emit_turn_activity_to_sink(
1434 turn_events,
1435 TurnActivity::independent(TurnEvent::Error {
1436 message: issue.message.clone(),
1437 }),
1438 )
1439 .await;
1440 emit_session_event_to_sink(events, error_event).await;
1441 let outcome_event = SessionEvent::TurnOutcome {
1442 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1443 };
1444 assembler.push(&outcome_event);
1445 emit_session_event_to_sink(events, outcome_event).await;
1446 assembler.push(&SessionEvent::Done);
1447 emit_session_event_to_sink(events, SessionEvent::Done).await;
1448 return Ok(assembler.finish(
1449 state.to_snapshot(),
1450 cancel.is_cancelled(),
1451 Some(issue),
1452 &self.host.core.control.termination,
1453 ));
1454 }
1455 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1456 self.state.clone(),
1457 Arc::clone(&self.host.core.clock),
1458 )
1459 .with_session_execution_lease(session_execution_fence.clone());
1460 let store = self
1461 .session
1462 .as_ref()
1463 .and_then(|session| session.history_store());
1464 let progress_store = if scoped_effect_controller.controller().durability_tier()
1468 == crate::DurabilityTier::Durable
1469 {
1470 None
1471 } else {
1472 store.as_ref().map(|store| store.as_ref())
1473 };
1474 turn_pipeline
1475 .prepared_checkpoint(
1476 progress_store,
1477 turn_policy.clone(),
1478 turn_index,
1479 &prepared.messages,
1480 self.session.as_mut(),
1481 )
1482 .await
1483 .map_err(super::runtime_error_from_store_commit)?;
1484 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1485 RuntimeSessionPolicy::from_provider(
1486 turn_policy.clone(),
1487 provider.with_clock(Arc::clone(&self.host.core.clock)),
1488 )
1489 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1490 } else {
1491 self.host
1492 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1493 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1494 };
1495 let manager = self
1496 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1497 .map_err(|err| {
1498 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1499 })?;
1500 let cancel_state = cancel.clone();
1501 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1502 let session = self
1503 .session
1504 .take()
1505 .expect("lash runtime session must be available");
1506 let mut driver = RuntimeTurnDriver {
1507 session,
1508 policy: resolved_turn_policy,
1509 host: self.host.clone(),
1510 turn_id: scoped_effect_controller.scope_id().to_string(),
1511 scoped_effect_controller,
1512 session_id: self.state.session_id.clone(),
1513 turn_index,
1514 turn_pipeline,
1515 llm_stream_summaries: HashMap::new(),
1516 next_llm_ordinal: 0,
1517 session_services: manager,
1518 protocol_turn_options: effective_protocol_turn_options,
1519 protocol_extension,
1520 turn_context,
1521 turn_causes: initial_turn_causes,
1522 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1523 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1524 session_execution_lease: session_execution_fence,
1525 turn_phase_probe: self.turn_phase_probe.clone(),
1526 };
1527 let protocol_run_offset = 0;
1528 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1529 let run_result = drive_turn_to_completion(
1530 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1531 &mut event_rx,
1532 &mut assembler,
1533 &child_usage_event_relay,
1534 events,
1535 turn_events,
1536 )
1537 .await;
1538 let (new_messages, _new_protocol_iteration) = match run_result {
1539 Ok(result) => result,
1540 Err(err) => {
1541 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1542 let RuntimeTurnDriver {
1543 session,
1544 pending_queue_claims,
1545 ..
1546 } = driver;
1547 self.session = Some(session);
1548 self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1549 .await;
1550 return Err(err);
1551 }
1552 };
1553 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1554 tracing::debug!(
1555 new_message_count = new_messages.len(),
1556 tool_call_count = assembler.tool_calls.len(),
1557 "runtime post-run_task"
1558 );
1559
1560 let RuntimeTurnDriver {
1561 session,
1562 policy,
1563 turn_pipeline,
1564 pending_queue_claims,
1565 ..
1566 } = driver;
1567 self.session = Some(session);
1568 let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1569 let finish_result = self
1570 .finish_turn(
1571 TurnFinishInput {
1572 turn_pipeline,
1573 assembler,
1574 new_messages,
1575 policy,
1576 turn_index,
1577 queued_work_completions: pending_queue_claims
1578 .iter()
1579 .map(crate::QueuedWorkClaim::completion)
1580 .collect(),
1581 trace_turn_id,
1582 },
1583 events,
1584 &finish_scoped_effect_controller,
1585 &cancel_state,
1586 session_execution_lease,
1587 session_execution_lease_release_policy,
1588 )
1589 .await;
1590 if let Err(err) = &finish_result {
1591 self.abandon_queued_work_claims_after_lease_loss(
1592 err,
1593 &pending_queue_claims_for_abandon,
1594 )
1595 .await;
1596 }
1597 finish_result
1598 }
1599 async fn normalize_input_items(
1600 &self,
1601 items: &[InputItem],
1602 image_blobs: &HashMap<String, Vec<u8>>,
1603 ) -> Result<Vec<NormalizedItem>, String> {
1604 normalize_input_items(
1605 items,
1606 image_blobs,
1607 self.host.core.durability.attachment_store.as_ref(),
1608 )
1609 .await
1610 }
1611}
1612
1613fn turn_input_from_text(text: String) -> TurnInput {
1614 TurnInput {
1615 items: vec![InputItem::Text { text }],
1616 image_blobs: HashMap::new(),
1617 protocol_turn_options: None,
1618 trace_turn_id: None,
1619 protocol_extension: None,
1620 turn_context: crate::TurnContext::default(),
1621 }
1622}
1623
1624fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1625 if completed_turn_count == 0 {
1626 root_turn_id.to_string()
1627 } else {
1628 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1629 }
1630}
1631
1632pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1633 if input.protocol_extension.is_some() {
1634 return Err(RuntimeError::new(
1635 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1636 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1637 ));
1638 }
1639 input
1640 .turn_context
1641 .live_plugin_inputs()
1642 .durable_effect_rejection()?;
1643 Ok(())
1644}
1645
1646async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1647 if !events.is_noop() {
1648 events.emit(activity).await;
1649 }
1650}
1651
1652async fn drive_turn_to_completion<F>(
1662 run_future: F,
1663 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1664 assembler: &mut TurnAssembler,
1665 child_usage_event_relay: &ChildUsageEventRelay,
1666 events: &dyn EventSink,
1667 turn_events: &dyn TurnActivitySink,
1668) -> Result<(crate::MessageSequence, usize), RuntimeError>
1669where
1670 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1671{
1672 let run_result = {
1673 let mut run_future = Box::pin(run_future);
1674 loop {
1675 tokio::select! {
1676 maybe_event = event_rx.recv() => {
1677 if let Some(event) = maybe_event {
1678 emit_runtime_stream_event_to_sinks(
1679 events,
1680 turn_events,
1681 event,
1682 assembler,
1683 )
1684 .await;
1685 }
1686 }
1687 completed = run_future.as_mut() => {
1688 child_usage_event_relay.clear();
1689 break completed;
1690 }
1691 }
1692 }
1693 };
1694 while let Some(event) = event_rx.recv().await {
1695 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1696 }
1697 run_result
1698}
1699
1700async fn emit_runtime_stream_event_to_sinks(
1701 events: &dyn EventSink,
1702 turn_events: &dyn TurnActivitySink,
1703 event: RuntimeStreamEvent,
1704 assembler: &mut TurnAssembler,
1705) {
1706 match event {
1707 RuntimeStreamEvent::Session(event) => {
1708 assembler.push(&event);
1709 emit_session_event_to_sink(events, event).await;
1710 }
1711 RuntimeStreamEvent::Turn(activity) => {
1712 assembler.push_turn_activity(&activity);
1713 emit_turn_activity_to_sink(turn_events, activity).await;
1714 }
1715 }
1716}
1717
1718#[cfg(test)]
1719mod tests {
1720 use super::agent_frame_follow_turn_id;
1721
1722 #[test]
1723 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1724 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1725 assert_eq!(
1726 agent_frame_follow_turn_id("root-turn", 1),
1727 "root-turn:agent-frame:1"
1728 );
1729 assert_eq!(
1730 agent_frame_follow_turn_id("root-turn", 2),
1731 "root-turn:agent-frame:2"
1732 );
1733 }
1734}