1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51#[derive(Clone, Copy)]
52enum SessionExecutionLeaseReleasePolicy {
53 FinalCommit,
54 KeepOnAgentFrameSwitch,
55}
56
57impl SessionExecutionLeaseReleasePolicy {
58 fn should_release(self, outcome: &TurnOutcome) -> bool {
59 match self {
60 Self::FinalCommit => true,
61 Self::KeepOnAgentFrameSwitch => {
62 !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
63 }
64 }
65 }
66}
67
68fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
69 match payload {
70 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
71 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
72 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
73 }
74}
75
76fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
77 claim
78 .batches
79 .iter()
80 .map(|batch| batch.batch_id.clone())
81 .collect()
82}
83
84fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
85 format!("{parent_turn_id}:{phase}")
86}
87
88fn scoped_child_turn_controller<'run>(
89 scoped_effect_controller: &'run ScopedEffectController<'_>,
90 session_id: &str,
91 turn_id: &str,
92) -> Result<ScopedEffectController<'run>, RuntimeError> {
93 ScopedEffectController::borrowed(
94 scoped_effect_controller.controller(),
95 ExecutionScope::turn(session_id, turn_id),
96 )
97}
98
99pub(in crate::runtime) fn queued_work_trace_payload(
100 boundary: crate::QueuedWorkClaimBoundary,
101 claim: &crate::QueuedWorkClaim,
102 causes: &[crate::TurnCause],
103) -> serde_json::Value {
104 serde_json::json!({
105 "boundary": boundary,
106 "claim_id": claim.claim_id,
107 "owner_id": claim.owner.owner_id,
108 "incarnation_id": claim.owner.incarnation_id,
109 "batch_ids": queued_work_batch_ids(claim),
110 "payload_types": claim.batches.iter()
111 .flat_map(|batch| batch.items.iter())
112 .map(|item| queued_work_payload_type(&item.payload))
113 .collect::<Vec<_>>(),
114 "causes": causes,
115 })
116}
117
118pub(in crate::runtime) fn queued_work_completion_trace_payload(
119 completions: &[crate::QueuedWorkCompletion],
120) -> serde_json::Value {
121 serde_json::json!({
122 "claims": completions.iter().map(|completion| {
123 serde_json::json!({
124 "session_id": completion.session_id,
125 "claim_id": completion.claim_id,
126 "batch_ids": completion.batch_ids,
127 })
128 }).collect::<Vec<_>>(),
129 })
130}
131
132async fn emit_queued_work_started_to_sink(
133 events: &dyn TurnActivitySink,
134 boundary: crate::QueuedWorkClaimBoundary,
135 claim: &crate::QueuedWorkClaim,
136 causes: Vec<crate::TurnCause>,
137) {
138 emit_turn_activity_to_sink(
139 events,
140 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
141 boundary,
142 batch_ids: queued_work_batch_ids(claim),
143 causes,
144 }),
145 )
146 .await;
147}
148
149pub(in crate::runtime) async fn send_queued_work_started_event(
150 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
151 boundary: crate::QueuedWorkClaimBoundary,
152 claim: &crate::QueuedWorkClaim,
153 causes: Vec<crate::TurnCause>,
154) {
155 send_turn_activity(
156 event_tx,
157 TurnActivityId::fresh(),
158 TurnEvent::QueuedWorkStarted {
159 boundary,
160 batch_ids: queued_work_batch_ids(claim),
161 causes,
162 },
163 )
164 .await;
165}
166
167struct TurnFinishInput {
168 turn_pipeline: TurnBoundary,
169 assembler: TurnAssembler,
170 new_messages: crate::MessageSequence,
171 policy: RuntimeSessionPolicy,
172 turn_index: usize,
173 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
174 trace_turn_id: String,
175}
176
177impl LashRuntime {
178 fn max_context_tokens(&self) -> usize {
179 self.state.effective_policy().context_window_tokens()
180 }
181
182 async fn claim_session_execution_lease(
183 &self,
184 cancel: CancellationToken,
185 busy_is_error: bool,
186 ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
187 let Some(store) = self
188 .session
189 .as_ref()
190 .and_then(|session| session.history_store())
191 else {
192 return Ok(None);
193 };
194 match SessionExecutionLeaseGuard::try_acquire(
195 store,
196 &self.state.session_id,
197 &self.runtime_lease_owner,
198 Arc::clone(&self.host.core.clock),
199 cancel,
200 )
201 .await
202 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
203 {
204 Some(lease) => Ok(Some(lease)),
205 None if busy_is_error => Err(RuntimeError::new(
206 RuntimeErrorCode::SessionExecutionBusy,
207 format!(
208 "session `{}` is already executing on another runtime owner",
209 self.state.session_id
210 ),
211 )),
212 None => Ok(None),
213 }
214 }
215
216 async fn settle_session_execution_lease<T>(
217 &self,
218 guard: Option<&SessionExecutionLeaseGuard>,
219 result: Result<T, RuntimeError>,
220 ) -> Result<T, RuntimeError> {
221 match result {
222 Ok(value) => {
223 if let Some(guard) = guard {
224 guard.release_if_live().await.map_err(|err| {
225 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
226 })?;
227 }
228 Ok(value)
229 }
230 Err(err) => {
231 if err.code != RuntimeErrorCode::StoreCommitFailed
232 && let Some(guard) = guard
233 && let Err(release_err) = guard.release_if_live().await
234 {
235 tracing::warn!(
236 error = %release_err,
237 "failed to release session execution lease after runtime error"
238 );
239 }
240 Err(err)
241 }
242 }
243 }
244
245 async fn ensure_session_execution_lease_live(
246 &self,
247 guard: Option<&SessionExecutionLeaseGuard>,
248 ) -> Result<(), RuntimeError> {
249 let Some(guard) = guard else {
250 return Ok(());
251 };
252 guard.refresh_or_mark_lost().await.map_err(|err| {
253 RuntimeError::new(
254 RuntimeErrorCode::SessionExecutionLeaseLost,
255 format!(
256 "session execution lease for session `{}` was lost before commit: {err}",
257 self.state.session_id
258 ),
259 )
260 })
261 }
262
263 async fn abandon_queued_work_claims_after_lease_loss(
264 &self,
265 err: &RuntimeError,
266 claims: &[crate::QueuedWorkClaim],
267 ) {
268 if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
269 return;
270 }
271 let Some(store) = self
272 .session
273 .as_ref()
274 .and_then(|session| session.history_store())
275 else {
276 return;
277 };
278 for claim in claims {
279 if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
280 tracing::warn!(
281 error = %abandon_err,
282 session_id = %claim.session_id,
283 claim_id = %claim.claim_id,
284 "failed to abandon queued work claim after session execution lease loss"
285 );
286 }
287 }
288 }
289
290 #[doc(hidden)]
291 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
292 self.turn_phase_probe = Some(probe);
293 }
294
295 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
296 if let Some(probe) = self.turn_phase_probe.as_ref() {
297 probe.begin(phase);
298 }
299 }
300
301 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
302 if let Some(probe) = self.turn_phase_probe.as_ref() {
303 probe.end(phase);
304 }
305 }
306
307 async fn finish_turn(
308 &mut self,
309 finish: TurnFinishInput,
310 events: &dyn EventSink,
311 scoped_effect_controller: &ScopedEffectController<'_>,
312 cancel_state: &CancellationToken,
313 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
314 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
315 ) -> Result<AssembledTurn, RuntimeError> {
316 let TurnFinishInput {
317 mut turn_pipeline,
318 assembler,
319 new_messages,
320 policy,
321 turn_index,
322 queued_work_completions,
323 trace_turn_id,
324 } = finish;
325 self.policy = self.state.effective_policy().clone();
326 turn_pipeline.state_mut().policy = self.policy.clone();
327 turn_pipeline.state_mut().turn_index = turn_index;
328
329 let mut turn_usage_delta = {
330 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
331 std::mem::take(&mut *ledger)
332 };
333 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
334 turn_usage_delta.push(TokenLedgerEntry {
335 source: "turn".to_string(),
336 model: policy.model.id.clone(),
337 usage: assembler.token_usage.clone(),
338 });
339 }
340 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
341
342 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
343 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
344 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
345 }
346
347 let last_prompt_usage = assembler
348 .last_llm_usage()
349 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
350 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
351 let assembled_state = turn_pipeline.export_state_for_assembly();
352 let assembled = assembler.finish(
353 assembled_state,
354 cancel_state.is_cancelled(),
355 None,
356 &self.host.core.control.termination,
357 );
358
359 let Some(session) = self.session.as_ref() else {
360 self.state.apply_snapshot(&assembled.state);
361 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
362 return Ok(assembled);
363 };
364 self.ensure_session_execution_lease_live(session_execution_lease)
365 .await?;
366
367 let plugins = Arc::clone(session.plugins());
368 let manager = match self.runtime_session_services_for_turn(None) {
369 Ok(manager) => manager,
370 Err(err) => {
371 return Err(RuntimeError::new(
372 RuntimeErrorCode::PluginSessionManager,
373 err.to_string(),
374 ));
375 }
376 };
377
378 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
379 let finalized = match plugins
380 .finalize_turn_with_phase_probe(
381 assembled,
382 manager.state_service(),
383 manager.lifecycle_service(),
384 manager.graph_service(),
385 self.turn_phase_probe.clone(),
386 )
387 .await
388 {
389 Ok(finalized) => finalized,
390 Err(err) => {
391 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
392 return Err(RuntimeError::new(
393 RuntimeErrorCode::PluginFinalizeTurn,
394 err.to_string(),
395 ));
396 }
397 };
398 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
399 self.ensure_session_execution_lease_live(session_execution_lease)
400 .await?;
401
402 let mut returned_turn = finalized.turn;
403 let release_session_execution_lease =
404 session_execution_lease_release_policy.should_release(&returned_turn.outcome);
405 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
406 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
407 let queued_work_completion_trace = queued_work_completions.clone();
408 let pending_attachment_ids = self
409 .host
410 .core
411 .durability
412 .attachment_store
413 .pending_manifest_commit_ids();
414 if let Err(err) = turn_pipeline
415 .final_commit(
416 &mut returned_turn,
417 self.session.as_mut(),
418 &turn_usage_delta,
419 Some(&trace_turn_id),
420 queued_work_completions,
421 pending_attachment_ids.clone(),
422 release_session_execution_lease
423 .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
424 .flatten(),
425 )
426 .await
427 {
428 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
429 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
430 return Err(err);
431 }
432 if release_session_execution_lease && let Some(lease) = session_execution_lease {
433 lease.mark_released();
434 }
435 self.host
436 .core
437 .durability
438 .attachment_store
439 .mark_manifest_committed(&pending_attachment_ids);
440 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
441
442 emit_session_events_to_sink(events, finalized.events).await;
443 self.state = turn_pipeline.into_final_state();
444 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
445 && let Some(session) = self.session.as_mut()
446 {
447 let protocol_session = Arc::clone(session.plugins().protocol_session());
448 let session_id = self.state.session_id.clone();
449 protocol_session
450 .restore_session(
451 crate::plugin::ProtocolSessionContext::new(session, &session_id),
452 &self.state,
453 )
454 .await
455 .map_err(|err| {
456 RuntimeError::new(
457 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
458 err.to_string(),
459 )
460 })?;
461 }
462 if !queued_work_completion_trace.is_empty() {
463 crate::trace::emit_trace(
464 &self.host.core.tracing.trace_sink,
465 &self.host.core.tracing.trace_context,
466 lash_trace::TraceContext::default()
467 .for_session(returned_turn.state.session_id.clone())
468 .for_turn_index(returned_turn.state.turn_index)
469 .for_turn(trace_turn_id.clone()),
470 lash_trace::TraceEvent::Custom {
471 name: "queued_work.completed".to_string(),
472 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
473 },
474 self.host.core.clock.as_ref(),
475 );
476 }
477 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
478 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
479 .await?;
480 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
481 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
482
483 self.emit_completed_turn_trace(
484 &returned_turn.state,
485 &returned_turn.outcome,
486 &trace_turn_id,
487 );
488 Ok(returned_turn)
489 }
490
491 fn emit_completed_turn_trace(
492 &self,
493 state: &SessionSnapshot,
494 outcome: &TurnOutcome,
495 trace_turn_id: &str,
496 ) {
497 if self.host.core.tracing.trace_sink.is_none() {
498 return;
499 }
500
501 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
502 crate::trace::emit_trace(
503 &self.host.core.tracing.trace_sink,
504 &self.host.core.tracing.trace_context,
505 lash_trace::TraceContext::default()
506 .for_session(state.session_id.clone())
507 .for_turn_index(state.turn_index)
508 .for_turn(trace_turn_id.to_string()),
509 lash_trace::TraceEvent::TurnCompleted {
510 status: status.to_string(),
511 done_reason: done_reason.to_string(),
512 agent_frame_switch,
513 },
514 self.host.core.clock.as_ref(),
515 );
516 }
517
518 async fn emit_turn_persisted_event(
519 &self,
520 returned_turn: &AssembledTurn,
521 scoped_effect_controller: &ScopedEffectController<'_>,
522 trace_turn_id: &str,
523 ) -> Result<(), RuntimeError> {
524 let Some(session) = self.session.as_ref() else {
525 return Ok(());
526 };
527 let Ok(manager) = self.runtime_session_services() else {
528 return Ok(());
529 };
530 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
531 let phase_controller = scoped_child_turn_controller(
532 scoped_effect_controller,
533 &self.state.session_id,
534 &phase_turn_id,
535 )?;
536 let direct_completions = manager.direct_completion_client(
537 RuntimeEffectControllerHandle::borrowed(phase_controller),
538 Some(phase_turn_id),
539 );
540
541 session
542 .plugins()
543 .emit_runtime_event_with_phase_probe(
544 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
545 crate::SessionStateChangedContext {
546 session_id: self.state.session_id.clone(),
547 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
548 sessions: manager.state_service(),
549 session_graph: manager.graph_service(),
550 direct_completions,
551 },
552 )),
553 self.turn_phase_probe.clone(),
554 )
555 .await;
556 Ok(())
557 }
558
559 pub async fn stream_turn(
561 &mut self,
562 input: TurnInput,
563 opts: TurnOptions<'_>,
564 ) -> Result<AssembledTurn, RuntimeError> {
565 let cancel = opts.cancel.clone();
566 let session_execution_lease = self
567 .claim_session_execution_lease(cancel.clone(), true)
568 .await?;
569 let scoped_effect_controller = opts.scoped_effect_controller();
570 let result = self
571 .stream_turn_with_scoped_effect_controller_inner(
572 input,
573 opts.events_or_noop(),
574 opts.turn_events_or_noop(),
575 scoped_effect_controller,
576 cancel,
577 None,
578 session_execution_lease.as_ref(),
579 SessionExecutionLeaseReleasePolicy::FinalCommit,
580 )
581 .await;
582 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
583 .await
584 }
585
586 pub async fn stream_next_queued_work(
587 &mut self,
588 opts: TurnOptions<'_>,
589 ) -> Result<Option<AssembledTurn>, RuntimeError> {
590 self.stream_queued_work(opts, None).await
591 }
592
593 pub async fn stream_selected_queued_work(
594 &mut self,
595 opts: TurnOptions<'_>,
596 batch_ids: &[String],
597 ) -> Result<Option<AssembledTurn>, RuntimeError> {
598 self.stream_queued_work(opts, Some(batch_ids)).await
599 }
600
601 async fn stream_queued_work(
602 &mut self,
603 opts: TurnOptions<'_>,
604 selected_batch_ids: Option<&[String]>,
605 ) -> Result<Option<AssembledTurn>, RuntimeError> {
606 let cancel = opts.cancel.clone();
607 let Some(session_execution_lease) = self
608 .claim_session_execution_lease(cancel.clone(), false)
609 .await?
610 else {
611 return Ok(None);
612 };
613 let session_execution_fence = session_execution_lease.fence();
614 loop {
615 match self
616 .drain_next_session_command(&session_execution_fence)
617 .await
618 {
619 Ok(Some(_)) => {}
620 Ok(None) => break,
621 Err(err) => {
622 let _ = session_execution_lease.release_if_live().await;
623 return Err(err);
624 }
625 }
626 }
627 let Some(store) = self
628 .session
629 .as_ref()
630 .and_then(|session| session.history_store())
631 else {
632 session_execution_lease
633 .release_if_live()
634 .await
635 .map_err(|err| {
636 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
637 })?;
638 return Ok(None);
639 };
640 let claim = if let Some(batch_ids) = selected_batch_ids {
641 store
642 .claim_ready_queued_work_by_batch_ids(
643 &self.state.session_id,
644 &session_execution_fence,
645 &self.runtime_lease_owner,
646 crate::QueuedWorkClaimBoundary::Idle,
647 crate::QUEUED_WORK_CLAIM_TTL_MS,
648 batch_ids,
649 )
650 .await
651 } else {
652 store
653 .claim_ready_queued_work(
654 &self.state.session_id,
655 &session_execution_fence,
656 &self.runtime_lease_owner,
657 crate::QueuedWorkClaimBoundary::Idle,
658 crate::QUEUED_WORK_CLAIM_TTL_MS,
659 64,
660 )
661 .await
662 }
663 .map_err(super::runtime_error_from_store_commit)?;
664 let Some(claim) = claim else {
665 session_execution_lease
666 .release_if_live()
667 .await
668 .map_err(|err| {
669 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
670 })?;
671 return Ok(None);
672 };
673 let mut work = claim.materialize_for_turn();
674 let turn_id = work
675 .input
676 .trace_turn_id
677 .clone()
678 .or_else(|| Some(opts.execution_scope_id().to_owned()))
679 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
680 work.input.trace_turn_id = Some(turn_id.clone());
681 let causes = work.turn_causes.clone();
682 emit_queued_work_started_to_sink(
683 opts.turn_events_or_noop(),
684 crate::QueuedWorkClaimBoundary::Idle,
685 &claim,
686 causes.clone(),
687 )
688 .await;
689 crate::trace::emit_trace(
690 &self.host.core.tracing.trace_sink,
691 &self.host.core.tracing.trace_context,
692 lash_trace::TraceContext::default()
693 .for_session(self.state.session_id.clone())
694 .for_turn_index(self.state.turn_index + 1)
695 .for_turn(turn_id.clone()),
696 lash_trace::TraceEvent::Custom {
697 name: "queued_work.claimed".to_string(),
698 payload: queued_work_trace_payload(
699 crate::QueuedWorkClaimBoundary::Idle,
700 &claim,
701 &causes,
702 ),
703 },
704 self.host.core.clock.as_ref(),
705 );
706 let claim_for_abandon = claim.clone();
707 let scoped_effect_controller = opts.scoped_effect_controller();
708 let result = self
709 .stream_turn_with_scoped_effect_controller_inner(
710 work.input,
711 opts.events_or_noop(),
712 opts.turn_events_or_noop(),
713 scoped_effect_controller,
714 cancel,
715 Some(claim),
716 Some(&session_execution_lease),
717 SessionExecutionLeaseReleasePolicy::FinalCommit,
718 )
719 .await
720 .map(Some);
721 if let Err(err) = &result {
722 self.abandon_queued_work_claims_after_lease_loss(
723 err,
724 std::slice::from_ref(&claim_for_abandon),
725 )
726 .await;
727 }
728 self.settle_session_execution_lease(Some(&session_execution_lease), result)
729 .await
730 }
731
732 fn ensure_durable_store_facets_for_scope(
740 &self,
741 scoped_effect_controller: &ScopedEffectController<'_>,
742 ) -> Result<(), RuntimeError> {
743 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
744 {
745 return Ok(());
746 }
747 if self
748 .host
749 .core
750 .durability
751 .attachment_store
752 .persistence()
753 .durability_tier()
754 != crate::DurabilityTier::Durable
755 {
756 return Err(RuntimeError::durable_store_required(
757 crate::DurableStoreFacet::AttachmentStore,
758 ));
759 }
760 if self
761 .host
762 .core
763 .durability
764 .process_env_store
765 .durability_tier()
766 != crate::DurabilityTier::Durable
767 {
768 return Err(RuntimeError::durable_store_required(
769 crate::DurableStoreFacet::ProcessEnvStore,
770 ));
771 }
772 if let Some(store) = self
773 .session
774 .as_ref()
775 .and_then(|session| session.history_store())
776 && store.durability_tier() != crate::DurabilityTier::Durable
777 {
778 return Err(RuntimeError::durable_store_required(
779 crate::DurableStoreFacet::SessionStore,
780 ));
781 }
782 if let Some(process_registry) = self.host.process_registry.as_ref()
783 && process_registry.durability_tier() != crate::DurabilityTier::Durable
784 {
785 return Err(RuntimeError::durable_store_required(
786 crate::DurableStoreFacet::ProcessRegistry,
787 ));
788 }
789 Ok(())
790 }
791
792 #[allow(clippy::too_many_arguments)]
793 async fn stream_turn_with_scoped_effect_controller_inner(
794 &mut self,
795 mut input: TurnInput,
796 events: &dyn EventSink,
797 turn_events: &dyn TurnActivitySink,
798 scoped_effect_controller: ScopedEffectController<'_>,
799 cancel: CancellationToken,
800 queued_claim: Option<crate::QueuedWorkClaim>,
801 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
802 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
803 ) -> Result<AssembledTurn, RuntimeError> {
804 if queued_claim.is_none() {
805 if let Some(lease) = session_execution_lease {
806 while self
807 .drain_next_session_command(&lease.fence())
808 .await?
809 .is_some()
810 {}
811 } else if self
812 .session
813 .as_ref()
814 .and_then(|session| session.history_store())
815 .is_some()
816 {
817 return Err(RuntimeError::new(
818 RuntimeErrorCode::StoreCommitFailed,
819 "session command drain requires a session execution lease",
820 ));
821 }
822 }
823 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
824 && scoped_effect_controller
825 .execution_scope()
826 .validates_turn_trace_id()
827 && input_turn_id != scoped_effect_controller.scope_id()
828 {
829 return Err(RuntimeError::new(
830 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
831 format!(
832 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
833 scoped_effect_controller.scope_id()
834 ),
835 ));
836 }
837 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
838 input
839 .trace_turn_id
840 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
841 self.stream_turn_inner(
842 input.clone(),
843 events,
844 turn_events,
845 scoped_effect_controller,
846 cancel.clone(),
847 queued_claim,
848 session_execution_lease,
849 session_execution_lease_release_policy,
850 )
851 .await
852 }
853
854 pub async fn stream_turn_with_agent_frames(
863 &mut self,
864 input: TurnInput,
865 opts: TurnOptions<'_>,
866 ) -> Result<AgentFrameRun, RuntimeError> {
867 let cancel = opts.cancel.clone();
868 let session_execution_lease = self
869 .claim_session_execution_lease(cancel.clone(), true)
870 .await?;
871 let scoped_effect_controller = opts.scoped_effect_controller();
872 let result = self
873 .stream_turn_with_agent_frames_inner(
874 input,
875 opts.events_or_noop(),
876 opts.turn_events_or_noop(),
877 scoped_effect_controller,
878 cancel,
879 session_execution_lease.as_ref(),
880 )
881 .await;
882 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
883 .await
884 }
885
886 async fn stream_turn_with_agent_frames_inner(
887 &mut self,
888 mut input: TurnInput,
889 events: &dyn EventSink,
890 turn_events: &dyn TurnActivitySink,
891 scoped_effect_controller: ScopedEffectController<'_>,
892 cancel: CancellationToken,
893 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
894 ) -> Result<AgentFrameRun, RuntimeError> {
895 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
896 && scoped_effect_controller
897 .execution_scope()
898 .validates_turn_trace_id()
899 && input_turn_id != scoped_effect_controller.scope_id()
900 {
901 return Err(RuntimeError::new(
902 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
903 format!(
904 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
905 scoped_effect_controller.scope_id()
906 ),
907 ));
908 }
909 let follow_protocol_turn_options = input.protocol_turn_options.clone();
910 let follow_turn_context = input.turn_context.clone();
911 let follow_trace_turn_id = input
912 .trace_turn_id
913 .clone()
914 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
915 input
916 .trace_turn_id
917 .get_or_insert(follow_trace_turn_id.clone());
918 let mut turns = Vec::new();
919 loop {
920 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
921 input.trace_turn_id = Some(turn_trace_turn_id.clone());
922 let turn_effect_controller = if turns.is_empty() {
923 scoped_effect_controller.clone()
924 } else {
925 ScopedEffectController::borrowed(
926 scoped_effect_controller.controller(),
927 ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
928 )?
929 };
930 let turn = self
931 .stream_turn_with_scoped_effect_controller_inner(
932 input,
933 events,
934 turn_events,
935 turn_effect_controller,
936 cancel.clone(),
937 None,
938 session_execution_lease,
939 SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
940 )
941 .await?;
942 let switched_frame = match &turn.outcome {
943 TurnOutcome::AgentFrameSwitch { frame_id, task } => {
944 Some((frame_id.clone(), task.clone()))
945 }
946 _ => None,
947 };
948 turns.push(turn);
949
950 let Some((_frame_id, task)) = switched_frame else {
951 return Ok(AgentFrameRun { turns });
952 };
953 input = turn_input_from_text(task);
954 input.protocol_turn_options = follow_protocol_turn_options.clone();
955 input.turn_context = follow_turn_context.clone();
956 }
957 }
958
959 #[allow(clippy::too_many_arguments)]
960 async fn stream_turn_inner(
961 &mut self,
962 mut input: TurnInput,
963 events: &dyn EventSink,
964 turn_events: &dyn TurnActivitySink,
965 scoped_effect_controller: ScopedEffectController<'_>,
966 cancel: CancellationToken,
967 queued_claim: Option<crate::QueuedWorkClaim>,
968 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
969 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
970 ) -> Result<AssembledTurn, RuntimeError> {
971 self.refresh_session_graph_from_store()
972 .await
973 .map_err(session_head_refresh_error)?;
974 let input_trace_turn_id = input.trace_turn_id.clone();
975 let queued_turn_work = queued_claim
976 .as_ref()
977 .map(crate::QueuedWorkClaim::materialize_for_turn);
978 if let Some(work) = queued_turn_work.as_ref()
979 && input.items.is_empty()
980 && input.image_blobs.is_empty()
981 {
982 input = work.input.clone();
983 if input.trace_turn_id.is_none() {
984 input.trace_turn_id = input_trace_turn_id;
985 }
986 }
987 if self
988 .session
989 .as_ref()
990 .and_then(|session| session.history_store())
991 .is_some()
992 {
993 ensure_durable_effect_input(&input)?;
994 }
995 if let Some(extension) = &input.protocol_extension
996 && let Some(session) = self.session.as_ref()
997 {
998 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
999 protocol_session
1000 .validate_turn_extension(extension)
1001 .await
1002 .map_err(|err| {
1003 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
1004 })?;
1005 }
1006 let previous_prompt_usage = self.state.last_prompt_usage.clone();
1007 let normalized = match self
1008 .normalize_input_items(&input.items, &input.image_blobs)
1009 .await
1010 {
1011 Ok(items) => items,
1012 Err(e) => {
1013 self.state.last_prompt_usage = None;
1014 let mut assembler = TurnAssembler::default();
1015 let error_event = SessionEvent::Error {
1016 message: e.clone(),
1017 envelope: Some(crate::session_model::ErrorEnvelope {
1018 kind: "input_validation".to_string(),
1019 code: Some("invalid_turn_input".to_string()),
1020 terminal_reason: None,
1021 user_message: e.clone(),
1022 raw: None,
1023 }),
1024 };
1025 assembler.push(&error_event);
1026 emit_turn_activity_to_sink(
1027 turn_events,
1028 TurnActivity::independent(TurnEvent::Error { message: e }),
1029 )
1030 .await;
1031 emit_session_event_to_sink(events, error_event).await;
1032 let outcome_event = SessionEvent::TurnOutcome {
1033 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1034 };
1035 assembler.push(&outcome_event);
1036 emit_session_event_to_sink(events, outcome_event).await;
1037 assembler.push(&SessionEvent::Done);
1038 emit_session_event_to_sink(events, SessionEvent::Done).await;
1039 return Ok(assembler.finish(
1040 self.state.to_snapshot(),
1041 false,
1042 None,
1043 &self.host.core.control.termination,
1044 ));
1045 }
1046 };
1047 let turn_index = self.state.turn_index + 1;
1048 let trace_turn_id = input
1049 .trace_turn_id
1050 .clone()
1051 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1052 if self.host.core.tracing.trace_sink.is_some() {
1053 let mut trace_metadata = std::collections::BTreeMap::new();
1054 trace_metadata.insert(
1055 "input_item_count".to_string(),
1056 serde_json::json!(normalized.len()),
1057 );
1058 crate::trace::emit_trace(
1059 &self.host.core.tracing.trace_sink,
1060 &self.host.core.tracing.trace_context,
1061 lash_trace::TraceContext::default()
1062 .for_session(self.state.session_id.clone())
1063 .for_turn_index(turn_index)
1064 .for_turn(trace_turn_id.clone()),
1065 lash_trace::TraceEvent::TurnStarted {
1066 metadata: trace_metadata,
1067 },
1068 self.host.core.clock.as_ref(),
1069 );
1070 }
1071
1072 let base_read_model = self.state.read_model();
1073 let base_messages = base_read_model.messages;
1074 let base_render_cache = base_read_model.prompt_render_cache;
1075 let mut turn_delta = Vec::new();
1076 let initial_turn_causes = queued_turn_work
1077 .as_ref()
1078 .map(|work| work.turn_causes.clone())
1079 .unwrap_or_default();
1080 turn_delta.extend(
1081 initial_turn_causes
1082 .iter()
1083 .map(crate::TurnCause::to_event_message),
1084 );
1085
1086 let user_id = fresh_message_id();
1087 let mut user_parts: Vec<Part> = Vec::new();
1088 for item in normalized {
1089 match item {
1090 NormalizedItem::Text(text) => {
1091 if text.is_empty() {
1092 continue;
1093 }
1094 user_parts.push(Part {
1095 id: format!("{}.p{}", user_id, user_parts.len()),
1096 kind: PartKind::Text,
1097 content: text,
1098 attachment: None,
1099 tool_call_id: None,
1100 tool_name: None,
1101 tool_replay: None,
1102 prune_state: PruneState::Intact,
1103 reasoning_meta: None,
1104 response_meta: None,
1105 });
1106 }
1107 NormalizedItem::Image(reference) => {
1108 user_parts.push(Part {
1109 id: format!("{}.p{}", user_id, user_parts.len()),
1110 kind: PartKind::Image,
1111 content: String::new(),
1112 attachment: Some(crate::session_model::message::PartAttachment {
1113 reference,
1114 }),
1115 tool_call_id: None,
1116 tool_name: None,
1117 tool_replay: None,
1118 prune_state: PruneState::Intact,
1119 reasoning_meta: None,
1120 response_meta: None,
1121 });
1122 }
1123 }
1124 }
1125 if user_parts.is_empty() && initial_turn_causes.is_empty() {
1126 user_parts.push(Part {
1127 id: format!("{}.p0", user_id),
1128 kind: PartKind::Text,
1129 content: String::new(),
1130 attachment: None,
1131 tool_call_id: None,
1132 tool_name: None,
1133 tool_replay: None,
1134 prune_state: PruneState::Intact,
1135 reasoning_meta: None,
1136 response_meta: None,
1137 });
1138 }
1139 if !user_parts.is_empty() {
1140 reassign_part_ids(&user_id, &mut user_parts);
1141 turn_delta.push(Message {
1142 id: user_id.clone(),
1143 role: MessageRole::User,
1144 parts: shared_parts(user_parts),
1145 origin: None,
1146 });
1147 }
1148
1149 let manager = self
1150 .runtime_session_services_for_turn(None)
1151 .map_err(|err| {
1152 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1153 })?;
1154 let plugin_session = self
1155 .session
1156 .as_ref()
1157 .map(|s| Arc::clone(s.plugins()))
1158 .ok_or_else(|| {
1159 RuntimeError::new(
1160 RuntimeErrorCode::ContextPrepareTurn,
1161 "runtime session not available",
1162 )
1163 })?;
1164 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1165 let prepare_phase_controller = scoped_child_turn_controller(
1166 &scoped_effect_controller,
1167 &self.state.session_id,
1168 &prepare_phase_turn_id,
1169 )?;
1170 let turn_ctx = crate::TurnTransformContext {
1171 session_id: self.state.session_id.clone(),
1172 state: self.read_view(),
1173 prompt_usage: previous_prompt_usage.clone(),
1174 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1175 sessions: manager.state_service(),
1176 session_lifecycle: manager.lifecycle_service(),
1177 session_graph: manager.graph_service(),
1178 scoped_effect_controller: scoped_effect_controller.clone(),
1179 direct_completions: manager.direct_completion_client(
1180 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1181 Some(prepare_phase_turn_id),
1182 ),
1183 };
1184 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1185 let prepared_context = plugin_session
1186 .prepare_turn_context(
1187 &turn_ctx,
1188 crate::session_model::context::PreparedContext {
1189 messages: crate::MessageSequence::from_base_and_delta(
1190 base_messages,
1191 turn_delta,
1192 )
1193 .with_base_render_cache(base_render_cache),
1194 ..Default::default()
1195 },
1196 self.turn_phase_probe.clone(),
1197 )
1198 .await
1199 .map_err(|err| {
1200 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1201 })?;
1202 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1203 drop(turn_ctx);
1208 let messages = prepared_context.messages;
1209 if let Some(session) = self.session.as_mut() {
1210 session
1211 .set_context_overlay(
1212 prepared_context.tool_providers,
1213 prepared_context.prompt_contributions,
1214 prepared_context.include_base_tools,
1215 )
1216 .map_err(|err| {
1217 RuntimeError::new(
1218 RuntimeErrorCode::Other("session_tool_registry".to_string()),
1219 err.to_string(),
1220 )
1221 })?;
1222 }
1223
1224 self.state.last_prompt_usage = None;
1225 Box::pin(self.stream_prepared_turn_inner(
1226 messages,
1227 previous_prompt_usage,
1228 input.protocol_turn_options.clone(),
1229 input.protocol_extension.clone(),
1230 input.turn_context.clone(),
1231 initial_turn_causes,
1232 trace_turn_id,
1233 turn_index,
1234 events,
1235 turn_events,
1236 scoped_effect_controller,
1237 cancel,
1238 queued_claim,
1239 session_execution_lease,
1240 session_execution_lease_release_policy,
1241 ))
1242 .await
1243 }
1244
1245 pub async fn run_turn_assembled(
1247 &mut self,
1248 input: TurnInput,
1249 cancel: CancellationToken,
1250 scoped_effect_controller: ScopedEffectController<'_>,
1251 ) -> Result<AssembledTurn, RuntimeError> {
1252 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1253 .await
1254 }
1255
1256 #[allow(clippy::too_many_arguments)]
1258 pub async fn stream_prepared_turn(
1259 &mut self,
1260 messages: crate::MessageSequence,
1261 previous_prompt_usage: Option<PromptUsage>,
1262 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1263 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1264 turn_context: crate::TurnContext,
1265 initial_turn_causes: Vec<crate::TurnCause>,
1266 trace_turn_id: String,
1267 turn_index: usize,
1268 events: &dyn EventSink,
1269 turn_events: &dyn TurnActivitySink,
1270 scoped_effect_controller: ScopedEffectController<'_>,
1271 cancel: CancellationToken,
1272 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1273 ) -> Result<AssembledTurn, RuntimeError> {
1274 let session_execution_lease = self
1275 .claim_session_execution_lease(cancel.clone(), true)
1276 .await?;
1277 let result = self
1278 .stream_prepared_turn_inner(
1279 messages,
1280 previous_prompt_usage,
1281 protocol_turn_options,
1282 protocol_extension,
1283 turn_context,
1284 initial_turn_causes,
1285 trace_turn_id,
1286 turn_index,
1287 events,
1288 turn_events,
1289 scoped_effect_controller,
1290 cancel,
1291 initial_queue_claim,
1292 session_execution_lease.as_ref(),
1293 SessionExecutionLeaseReleasePolicy::FinalCommit,
1294 )
1295 .await;
1296 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1297 .await
1298 }
1299
1300 #[allow(clippy::too_many_arguments)]
1301 async fn stream_prepared_turn_inner(
1302 &mut self,
1303 messages: crate::MessageSequence,
1304 _previous_prompt_usage: Option<PromptUsage>,
1305 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1306 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1307 turn_context: crate::TurnContext,
1308 initial_turn_causes: Vec<crate::TurnCause>,
1309 trace_turn_id: String,
1310 turn_index: usize,
1311 events: &dyn EventSink,
1312 turn_events: &dyn TurnActivitySink,
1313 scoped_effect_controller: ScopedEffectController<'_>,
1314 cancel: CancellationToken,
1315 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1316 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1317 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1318 ) -> Result<AssembledTurn, RuntimeError> {
1319 if session_execution_lease.is_none()
1320 && self
1321 .session
1322 .as_ref()
1323 .and_then(|session| session.history_store())
1324 .is_some()
1325 {
1326 return Err(RuntimeError::new(
1327 RuntimeErrorCode::StoreCommitFailed,
1328 "prepared turn requires a session execution lease",
1329 ));
1330 }
1331 let session_execution_fence =
1332 session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1333 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1334 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1335 let mut turn_policy = self.state.effective_policy().clone();
1336 let turn_provider_override = turn_context.provider().cloned();
1337 if let Some(provider) = turn_provider_override.as_ref() {
1338 turn_policy.provider_id = provider.kind().to_string();
1339 }
1340 if let Some(model) = turn_context.model_spec() {
1341 turn_policy.model = model.clone();
1342 }
1343 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1344 let effective_protocol_turn_options = protocol_turn_options
1345 .clone()
1346 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1347 .unwrap_or(session_protocol_turn_options);
1348 let manager = self
1349 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1350 .map_err(|err| {
1351 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1352 })?;
1353 let plugins = {
1354 let session = self
1355 .session
1356 .as_ref()
1357 .expect("lash runtime session must be available");
1358 Arc::clone(session.plugins())
1359 };
1360 let mut assembler = TurnAssembler::new();
1361 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1362 let prepared = {
1368 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1369 PrepareTurnRequest {
1370 session_id: self.state.session_id.clone(),
1371 state: crate::SessionReadView::from_runtime_state(
1372 &self.state,
1373 turn_policy.clone(),
1374 effective_protocol_turn_options.clone(),
1375 ),
1376 messages,
1377 sessions: manager.state_service(),
1378 session_lifecycle: manager.lifecycle_service(),
1379 session_graph: manager.graph_service(),
1380 turn_context: turn_context.clone(),
1381 },
1382 self.turn_phase_probe.clone(),
1383 );
1384 let mut prepare_turn = Box::pin(prepare_turn);
1385
1386 loop {
1387 tokio::select! {
1388 prepared = prepare_turn.as_mut() => {
1389 let prepared = prepared.map_err(|err| {
1390 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1391 })?;
1392 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1393 break prepared;
1394 }
1395 maybe_event = event_rx.recv() => {
1396 if let Some(event) = maybe_event {
1397 emit_runtime_stream_event_to_sinks(
1398 events,
1399 turn_events,
1400 event,
1401 &mut assembler,
1402 )
1403 .await;
1404 }
1405 }
1406 }
1407 }
1408 };
1409 for event in &prepared.events {
1410 assembler.push(event);
1411 }
1412 emit_session_events_to_sink(events, prepared.events).await;
1413 if let Some(abort) = prepared.abort {
1414 drop(event_tx);
1415
1416 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1417 self.state.clone(),
1418 Arc::clone(&self.host.core.clock),
1419 );
1420 turn_pipeline.apply_prepared_messages(&prepared.messages);
1421 let state = turn_pipeline.into_final_state();
1422 let issue = TurnIssue {
1423 kind: "plugin".to_string(),
1424 code: Some(abort.code),
1425 terminal_reason: None,
1426 message: abort.message.clone(),
1427 raw: None,
1428 };
1429 let error_event = SessionEvent::Error {
1430 message: abort.message,
1431 envelope: Some(crate::session_model::ErrorEnvelope {
1432 kind: "plugin".to_string(),
1433 code: issue.code.clone(),
1434 terminal_reason: None,
1435 user_message: issue.message.clone(),
1436 raw: None,
1437 }),
1438 };
1439 assembler.push(&error_event);
1440 emit_turn_activity_to_sink(
1441 turn_events,
1442 TurnActivity::independent(TurnEvent::Error {
1443 message: issue.message.clone(),
1444 }),
1445 )
1446 .await;
1447 emit_session_event_to_sink(events, error_event).await;
1448 let outcome_event = SessionEvent::TurnOutcome {
1449 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1450 };
1451 assembler.push(&outcome_event);
1452 emit_session_event_to_sink(events, outcome_event).await;
1453 assembler.push(&SessionEvent::Done);
1454 emit_session_event_to_sink(events, SessionEvent::Done).await;
1455 return Ok(assembler.finish(
1456 state.to_snapshot(),
1457 cancel.is_cancelled(),
1458 Some(issue),
1459 &self.host.core.control.termination,
1460 ));
1461 }
1462 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1463 self.state.clone(),
1464 Arc::clone(&self.host.core.clock),
1465 )
1466 .with_session_execution_lease(session_execution_fence.clone());
1467 let store = self
1468 .session
1469 .as_ref()
1470 .and_then(|session| session.history_store());
1471 let progress_store = if scoped_effect_controller.controller().durability_tier()
1475 == crate::DurabilityTier::Durable
1476 {
1477 None
1478 } else {
1479 store.as_ref().map(|store| store.as_ref())
1480 };
1481 turn_pipeline
1482 .prepared_checkpoint(
1483 progress_store,
1484 turn_policy.clone(),
1485 turn_index,
1486 &prepared.messages,
1487 self.session.as_mut(),
1488 )
1489 .await
1490 .map_err(super::runtime_error_from_store_commit)?;
1491 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1492 RuntimeSessionPolicy::from_provider(
1493 turn_policy.clone(),
1494 provider.with_clock(Arc::clone(&self.host.core.clock)),
1495 )
1496 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1497 } else {
1498 self.host
1499 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1500 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1501 };
1502 let manager = self
1503 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1504 .map_err(|err| {
1505 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1506 })?;
1507 let cancel_state = cancel.clone();
1508 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1509 let session = self
1510 .session
1511 .take()
1512 .expect("lash runtime session must be available");
1513 let mut driver = RuntimeTurnDriver {
1514 session,
1515 policy: resolved_turn_policy,
1516 host: self.host.clone(),
1517 turn_id: scoped_effect_controller.scope_id().to_string(),
1518 scoped_effect_controller,
1519 session_id: self.state.session_id.clone(),
1520 turn_index,
1521 turn_pipeline,
1522 llm_stream_summaries: HashMap::new(),
1523 next_llm_ordinal: 0,
1524 session_services: manager,
1525 protocol_turn_options: effective_protocol_turn_options,
1526 protocol_extension,
1527 turn_context,
1528 turn_causes: initial_turn_causes,
1529 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1530 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1531 session_execution_lease: session_execution_fence,
1532 runtime_lease_owner: self.runtime_lease_owner.clone(),
1533 turn_phase_probe: self.turn_phase_probe.clone(),
1534 };
1535 let protocol_run_offset = 0;
1536 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1537 let run_result = drive_turn_to_completion(
1538 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1539 &mut event_rx,
1540 &mut assembler,
1541 &child_usage_event_relay,
1542 events,
1543 turn_events,
1544 )
1545 .await;
1546 let (new_messages, _new_protocol_iteration) = match run_result {
1547 Ok(result) => result,
1548 Err(err) => {
1549 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1550 let RuntimeTurnDriver {
1551 session,
1552 pending_queue_claims,
1553 ..
1554 } = driver;
1555 self.session = Some(session);
1556 self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1557 .await;
1558 return Err(err);
1559 }
1560 };
1561 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1562 tracing::debug!(
1563 new_message_count = new_messages.len(),
1564 tool_call_count = assembler.tool_calls.len(),
1565 "runtime post-run_task"
1566 );
1567
1568 let RuntimeTurnDriver {
1569 session,
1570 policy,
1571 turn_pipeline,
1572 pending_queue_claims,
1573 ..
1574 } = driver;
1575 self.session = Some(session);
1576 let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1577 let finish_result = self
1578 .finish_turn(
1579 TurnFinishInput {
1580 turn_pipeline,
1581 assembler,
1582 new_messages,
1583 policy,
1584 turn_index,
1585 queued_work_completions: pending_queue_claims
1586 .iter()
1587 .map(crate::QueuedWorkClaim::completion)
1588 .collect(),
1589 trace_turn_id,
1590 },
1591 events,
1592 &finish_scoped_effect_controller,
1593 &cancel_state,
1594 session_execution_lease,
1595 session_execution_lease_release_policy,
1596 )
1597 .await;
1598 if let Err(err) = &finish_result {
1599 self.abandon_queued_work_claims_after_lease_loss(
1600 err,
1601 &pending_queue_claims_for_abandon,
1602 )
1603 .await;
1604 }
1605 finish_result
1606 }
1607 async fn normalize_input_items(
1608 &self,
1609 items: &[InputItem],
1610 image_blobs: &HashMap<String, Vec<u8>>,
1611 ) -> Result<Vec<NormalizedItem>, String> {
1612 normalize_input_items(
1613 items,
1614 image_blobs,
1615 self.host.core.durability.attachment_store.as_ref(),
1616 )
1617 .await
1618 }
1619}
1620
1621fn turn_input_from_text(text: String) -> TurnInput {
1622 TurnInput {
1623 items: vec![InputItem::Text { text }],
1624 image_blobs: HashMap::new(),
1625 protocol_turn_options: None,
1626 trace_turn_id: None,
1627 protocol_extension: None,
1628 turn_context: crate::TurnContext::default(),
1629 }
1630}
1631
1632fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1633 if completed_turn_count == 0 {
1634 root_turn_id.to_string()
1635 } else {
1636 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1637 }
1638}
1639
1640pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1641 if input.protocol_extension.is_some() {
1642 return Err(RuntimeError::new(
1643 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1644 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1645 ));
1646 }
1647 input
1648 .turn_context
1649 .live_plugin_inputs()
1650 .durable_effect_rejection()?;
1651 Ok(())
1652}
1653
1654async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1655 if !events.is_noop() {
1656 events.emit(activity).await;
1657 }
1658}
1659
1660async fn drive_turn_to_completion<F>(
1670 run_future: F,
1671 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1672 assembler: &mut TurnAssembler,
1673 child_usage_event_relay: &ChildUsageEventRelay,
1674 events: &dyn EventSink,
1675 turn_events: &dyn TurnActivitySink,
1676) -> Result<(crate::MessageSequence, usize), RuntimeError>
1677where
1678 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1679{
1680 let run_result = {
1681 let mut run_future = Box::pin(run_future);
1682 loop {
1683 tokio::select! {
1684 maybe_event = event_rx.recv() => {
1685 if let Some(event) = maybe_event {
1686 emit_runtime_stream_event_to_sinks(
1687 events,
1688 turn_events,
1689 event,
1690 assembler,
1691 )
1692 .await;
1693 }
1694 }
1695 completed = run_future.as_mut() => {
1696 child_usage_event_relay.clear();
1697 break completed;
1698 }
1699 }
1700 }
1701 };
1702 while let Some(event) = event_rx.recv().await {
1703 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1704 }
1705 run_result
1706}
1707
1708async fn emit_runtime_stream_event_to_sinks(
1709 events: &dyn EventSink,
1710 turn_events: &dyn TurnActivitySink,
1711 event: RuntimeStreamEvent,
1712 assembler: &mut TurnAssembler,
1713) {
1714 match event {
1715 RuntimeStreamEvent::Session(event) => {
1716 assembler.push(&event);
1717 emit_session_event_to_sink(events, event).await;
1718 }
1719 RuntimeStreamEvent::Turn(activity) => {
1720 assembler.push_turn_activity(&activity);
1721 emit_turn_activity_to_sink(turn_events, activity).await;
1722 }
1723 }
1724}
1725
1726#[cfg(test)]
1727mod tests {
1728 use super::agent_frame_follow_turn_id;
1729
1730 #[test]
1731 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1732 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1733 assert_eq!(
1734 agent_frame_follow_turn_id("root-turn", 1),
1735 "root-turn:agent-frame:1"
1736 );
1737 assert_eq!(
1738 agent_frame_follow_turn_id("root-turn", 2),
1739 "root-turn:agent-frame:2"
1740 );
1741 }
1742}