1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51#[derive(Clone, Copy)]
52enum SessionExecutionLeaseReleasePolicy {
53 FinalCommit,
54 KeepOnAgentFrameSwitch,
55}
56
57impl SessionExecutionLeaseReleasePolicy {
58 fn should_release(self, outcome: &TurnOutcome) -> bool {
59 match self {
60 Self::FinalCommit => true,
61 Self::KeepOnAgentFrameSwitch => {
62 !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
63 }
64 }
65 }
66}
67
68fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
69 match payload {
70 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
71 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
72 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
73 }
74}
75
76fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
77 claim
78 .batches
79 .iter()
80 .map(|batch| batch.batch_id.clone())
81 .collect()
82}
83
84fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
85 format!("{parent_turn_id}:{phase}")
86}
87
88fn scoped_child_turn_controller<'run>(
89 scoped_effect_controller: &'run ScopedEffectController<'_>,
90 session_id: &str,
91 turn_id: &str,
92) -> Result<ScopedEffectController<'run>, RuntimeError> {
93 ScopedEffectController::borrowed(
94 scoped_effect_controller.controller(),
95 ExecutionScope::turn(session_id, turn_id),
96 )
97}
98
99pub(in crate::runtime) fn queued_work_trace_payload(
100 boundary: crate::QueuedWorkClaimBoundary,
101 claim: &crate::QueuedWorkClaim,
102 causes: &[crate::TurnCause],
103) -> serde_json::Value {
104 serde_json::json!({
105 "boundary": boundary,
106 "claim_id": claim.claim_id,
107 "owner_id": claim.owner.owner_id,
108 "incarnation_id": claim.owner.incarnation_id,
109 "batch_ids": queued_work_batch_ids(claim),
110 "payload_types": claim.batches.iter()
111 .flat_map(|batch| batch.items.iter())
112 .map(|item| queued_work_payload_type(&item.payload))
113 .collect::<Vec<_>>(),
114 "causes": causes,
115 })
116}
117
118pub(in crate::runtime) fn queued_work_completion_trace_payload(
119 completions: &[crate::QueuedWorkCompletion],
120) -> serde_json::Value {
121 serde_json::json!({
122 "claims": completions.iter().map(|completion| {
123 serde_json::json!({
124 "session_id": completion.session_id,
125 "claim_id": completion.claim_id,
126 "batch_ids": completion.batch_ids,
127 })
128 }).collect::<Vec<_>>(),
129 })
130}
131
132async fn emit_queued_work_started_to_sink(
133 events: &dyn TurnActivitySink,
134 boundary: crate::QueuedWorkClaimBoundary,
135 claim: &crate::QueuedWorkClaim,
136 causes: Vec<crate::TurnCause>,
137) {
138 emit_turn_activity_to_sink(
139 events,
140 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
141 boundary,
142 batch_ids: queued_work_batch_ids(claim),
143 causes,
144 }),
145 )
146 .await;
147}
148
149pub(in crate::runtime) async fn send_queued_work_started_event(
150 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
151 boundary: crate::QueuedWorkClaimBoundary,
152 claim: &crate::QueuedWorkClaim,
153 causes: Vec<crate::TurnCause>,
154) {
155 send_turn_activity(
156 event_tx,
157 TurnActivityId::fresh(),
158 TurnEvent::QueuedWorkStarted {
159 boundary,
160 batch_ids: queued_work_batch_ids(claim),
161 causes,
162 },
163 )
164 .await;
165}
166
167struct TurnFinishInput {
168 turn_pipeline: TurnBoundary,
169 assembler: TurnAssembler,
170 new_messages: crate::MessageSequence,
171 policy: RuntimeSessionPolicy,
172 turn_index: usize,
173 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
174 trace_turn_id: String,
175}
176
177impl LashRuntime {
178 fn max_context_tokens(&self) -> usize {
179 self.state.effective_policy().context_window_tokens()
180 }
181
182 async fn claim_session_execution_lease(
183 &self,
184 cancel: CancellationToken,
185 busy_is_error: bool,
186 ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
187 let Some(store) = self
188 .session
189 .as_ref()
190 .and_then(|session| session.history_store())
191 else {
192 return Ok(None);
193 };
194 match SessionExecutionLeaseGuard::try_acquire(
195 store,
196 &self.state.session_id,
197 &self.runtime_lease_owner,
198 Arc::clone(&self.host.core.clock),
199 cancel,
200 )
201 .await
202 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
203 {
204 Some(lease) => Ok(Some(lease)),
205 None if busy_is_error => Err(RuntimeError::new(
206 RuntimeErrorCode::SessionExecutionBusy,
207 format!(
208 "session `{}` is already executing on another runtime owner",
209 self.state.session_id
210 ),
211 )),
212 None => Ok(None),
213 }
214 }
215
216 async fn settle_session_execution_lease<T>(
217 &self,
218 guard: Option<&SessionExecutionLeaseGuard>,
219 result: Result<T, RuntimeError>,
220 ) -> Result<T, RuntimeError> {
221 match result {
222 Ok(value) => {
223 if let Some(guard) = guard {
224 guard.release_if_live().await.map_err(|err| {
225 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
226 })?;
227 }
228 Ok(value)
229 }
230 Err(err) => {
231 if err.code != RuntimeErrorCode::StoreCommitFailed
232 && let Some(guard) = guard
233 && let Err(release_err) = guard.release_if_live().await
234 {
235 tracing::warn!(
236 error = %release_err,
237 "failed to release session execution lease after runtime error"
238 );
239 }
240 Err(err)
241 }
242 }
243 }
244
245 async fn ensure_session_execution_lease_live(
246 &self,
247 guard: Option<&SessionExecutionLeaseGuard>,
248 ) -> Result<(), RuntimeError> {
249 let Some(guard) = guard else {
250 return Ok(());
251 };
252 guard.refresh_or_mark_lost().await.map_err(|err| {
253 RuntimeError::new(
254 RuntimeErrorCode::SessionExecutionLeaseLost,
255 format!(
256 "session execution lease for session `{}` was lost before commit: {err}",
257 self.state.session_id
258 ),
259 )
260 })
261 }
262
263 async fn abandon_queued_work_claims_after_lease_loss(
264 &self,
265 err: &RuntimeError,
266 claims: &[crate::QueuedWorkClaim],
267 ) {
268 if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
269 return;
270 }
271 let Some(store) = self
272 .session
273 .as_ref()
274 .and_then(|session| session.history_store())
275 else {
276 return;
277 };
278 for claim in claims {
279 if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
280 tracing::warn!(
281 error = %abandon_err,
282 session_id = %claim.session_id,
283 claim_id = %claim.claim_id,
284 "failed to abandon queued work claim after session execution lease loss"
285 );
286 }
287 }
288 }
289
290 #[doc(hidden)]
291 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
292 self.turn_phase_probe = Some(probe);
293 }
294
295 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
296 if let Some(probe) = self.turn_phase_probe.as_ref() {
297 probe.begin(phase);
298 }
299 }
300
301 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
302 if let Some(probe) = self.turn_phase_probe.as_ref() {
303 probe.end(phase);
304 }
305 }
306
307 async fn finish_turn(
308 &mut self,
309 finish: TurnFinishInput,
310 events: &dyn EventSink,
311 scoped_effect_controller: &ScopedEffectController<'_>,
312 cancel_state: &CancellationToken,
313 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
314 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
315 ) -> Result<AssembledTurn, RuntimeError> {
316 let TurnFinishInput {
317 mut turn_pipeline,
318 assembler,
319 new_messages,
320 policy,
321 turn_index,
322 queued_work_completions,
323 trace_turn_id,
324 } = finish;
325 self.policy = self.state.effective_policy().clone();
326 turn_pipeline.state_mut().policy = self.policy.clone();
327 turn_pipeline.state_mut().turn_index = turn_index;
328
329 let mut turn_usage_delta = {
330 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
331 std::mem::take(&mut *ledger)
332 };
333 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
334 turn_usage_delta.push(TokenLedgerEntry {
335 source: "turn".to_string(),
336 model: policy.model.id.clone(),
337 usage: assembler.token_usage.clone(),
338 });
339 }
340 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
341
342 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
343 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
344 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
345 }
346
347 let last_prompt_usage = assembler
348 .last_llm_usage()
349 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
350 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
351 let assembled_state = turn_pipeline.export_state_for_assembly();
352 let assembled = assembler.finish(
353 assembled_state,
354 cancel_state.is_cancelled(),
355 None,
356 &self.host.core.control.termination,
357 );
358
359 let Some(session) = self.session.as_ref() else {
360 self.state.apply_snapshot(&assembled.state);
361 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
362 return Ok(assembled);
363 };
364 self.ensure_session_execution_lease_live(session_execution_lease)
365 .await?;
366
367 let plugins = Arc::clone(session.plugins());
368 let manager = match self.runtime_session_services_for_turn(None) {
369 Ok(manager) => manager,
370 Err(err) => {
371 return Err(RuntimeError::new(
372 RuntimeErrorCode::PluginSessionManager,
373 err.to_string(),
374 ));
375 }
376 };
377
378 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
379 let finalized = match plugins
380 .finalize_turn_with_phase_probe(
381 assembled,
382 manager.state_service(),
383 manager.lifecycle_service(),
384 manager.graph_service(),
385 self.turn_phase_probe.clone(),
386 )
387 .await
388 {
389 Ok(finalized) => finalized,
390 Err(err) => {
391 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
392 return Err(RuntimeError::new(
393 RuntimeErrorCode::PluginFinalizeTurn,
394 err.to_string(),
395 ));
396 }
397 };
398 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
399 self.ensure_session_execution_lease_live(session_execution_lease)
400 .await?;
401
402 let mut returned_turn = finalized.turn;
403 let release_session_execution_lease =
404 session_execution_lease_release_policy.should_release(&returned_turn.outcome);
405 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
406 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
407 let queued_work_completion_trace = queued_work_completions.clone();
408 let pending_attachment_ids = self
409 .host
410 .core
411 .durability
412 .attachment_store
413 .pending_manifest_commit_ids();
414 if let Err(err) = turn_pipeline
415 .final_commit(
416 &mut returned_turn,
417 self.session.as_mut(),
418 &turn_usage_delta,
419 Some(&trace_turn_id),
420 queued_work_completions,
421 pending_attachment_ids.clone(),
422 release_session_execution_lease
423 .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
424 .flatten(),
425 )
426 .await
427 {
428 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
429 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
430 return Err(err);
431 }
432 if release_session_execution_lease && let Some(lease) = session_execution_lease {
433 lease.mark_released();
434 }
435 self.host
436 .core
437 .durability
438 .attachment_store
439 .mark_manifest_committed(&pending_attachment_ids);
440 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
441
442 emit_session_events_to_sink(events, finalized.events).await;
443 self.state = turn_pipeline.into_final_state();
444 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
445 && let Some(session) = self.session.as_mut()
446 {
447 let protocol_session = Arc::clone(session.plugins().protocol_session());
448 let session_id = self.state.session_id.clone();
449 protocol_session
450 .restore_session(
451 crate::plugin::ProtocolSessionContext::new(session, &session_id),
452 &self.state,
453 )
454 .await
455 .map_err(|err| {
456 RuntimeError::new(
457 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
458 err.to_string(),
459 )
460 })?;
461 }
462 if !queued_work_completion_trace.is_empty() {
463 crate::trace::emit_trace(
464 &self.host.core.tracing.trace_sink,
465 &self.host.core.tracing.trace_context,
466 lash_trace::TraceContext::default()
467 .for_session(returned_turn.state.session_id.clone())
468 .for_turn_index(returned_turn.state.turn_index)
469 .for_turn(trace_turn_id.clone()),
470 lash_trace::TraceEvent::Custom {
471 name: "queued_work.completed".to_string(),
472 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
473 },
474 self.host.core.clock.as_ref(),
475 );
476 }
477 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
478 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
479 .await?;
480 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
481 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
482
483 self.emit_completed_turn_trace(
484 &returned_turn.state,
485 &returned_turn.outcome,
486 &trace_turn_id,
487 );
488 Ok(returned_turn)
489 }
490
491 fn emit_completed_turn_trace(
492 &self,
493 state: &SessionSnapshot,
494 outcome: &TurnOutcome,
495 trace_turn_id: &str,
496 ) {
497 if self.host.core.tracing.trace_sink.is_none() {
498 return;
499 }
500
501 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
502 crate::trace::emit_trace(
503 &self.host.core.tracing.trace_sink,
504 &self.host.core.tracing.trace_context,
505 lash_trace::TraceContext::default()
506 .for_session(state.session_id.clone())
507 .for_turn_index(state.turn_index)
508 .for_turn(trace_turn_id.to_string()),
509 lash_trace::TraceEvent::TurnCompleted {
510 status: status.to_string(),
511 done_reason: done_reason.to_string(),
512 agent_frame_switch,
513 },
514 self.host.core.clock.as_ref(),
515 );
516 }
517
518 async fn emit_turn_persisted_event(
519 &self,
520 returned_turn: &AssembledTurn,
521 scoped_effect_controller: &ScopedEffectController<'_>,
522 trace_turn_id: &str,
523 ) -> Result<(), RuntimeError> {
524 let Some(session) = self.session.as_ref() else {
525 return Ok(());
526 };
527 let Ok(manager) = self.runtime_session_services() else {
528 return Ok(());
529 };
530 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
531 let phase_controller = scoped_child_turn_controller(
532 scoped_effect_controller,
533 &self.state.session_id,
534 &phase_turn_id,
535 )?;
536 let direct_completions = manager.direct_completion_client(
537 RuntimeEffectControllerHandle::borrowed(phase_controller),
538 Some(phase_turn_id),
539 );
540
541 session
542 .plugins()
543 .emit_runtime_event_with_phase_probe(
544 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
545 crate::SessionStateChangedContext {
546 session_id: self.state.session_id.clone(),
547 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
548 sessions: manager.state_service(),
549 session_graph: manager.graph_service(),
550 direct_completions,
551 },
552 )),
553 self.turn_phase_probe.clone(),
554 )
555 .await;
556 Ok(())
557 }
558
559 pub async fn stream_turn(
561 &mut self,
562 input: TurnInput,
563 opts: TurnOptions<'_>,
564 ) -> Result<AssembledTurn, RuntimeError> {
565 let cancel = opts.cancel.clone();
566 let session_execution_lease = self
567 .claim_session_execution_lease(cancel.clone(), true)
568 .await?;
569 let scoped_effect_controller = opts.scoped_effect_controller();
570 let result = self
571 .stream_turn_with_scoped_effect_controller_inner(
572 input,
573 opts.events_or_noop(),
574 opts.turn_events_or_noop(),
575 scoped_effect_controller,
576 cancel,
577 None,
578 session_execution_lease.as_ref(),
579 SessionExecutionLeaseReleasePolicy::FinalCommit,
580 )
581 .await;
582 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
583 .await
584 }
585
586 pub async fn stream_next_queued_work(
587 &mut self,
588 opts: TurnOptions<'_>,
589 ) -> Result<Option<AssembledTurn>, RuntimeError> {
590 self.stream_queued_work(opts, None).await
591 }
592
593 pub async fn stream_selected_queued_work(
594 &mut self,
595 opts: TurnOptions<'_>,
596 batch_ids: &[String],
597 ) -> Result<Option<AssembledTurn>, RuntimeError> {
598 self.stream_queued_work(opts, Some(batch_ids)).await
599 }
600
601 async fn stream_queued_work(
602 &mut self,
603 opts: TurnOptions<'_>,
604 selected_batch_ids: Option<&[String]>,
605 ) -> Result<Option<AssembledTurn>, RuntimeError> {
606 let cancel = opts.cancel.clone();
607 let Some(session_execution_lease) = self
608 .claim_session_execution_lease(cancel.clone(), false)
609 .await?
610 else {
611 return Ok(None);
612 };
613 let session_execution_fence = session_execution_lease.fence();
614 if self
615 .drain_next_session_command(&session_execution_fence)
616 .await?
617 .is_some()
618 {
619 session_execution_lease
620 .release_if_live()
621 .await
622 .map_err(|err| {
623 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
624 })?;
625 return Ok(None);
626 }
627 let Some(store) = self
628 .session
629 .as_ref()
630 .and_then(|session| session.history_store())
631 else {
632 return Ok(None);
633 };
634 let claim = if let Some(batch_ids) = selected_batch_ids {
635 store
636 .claim_ready_queued_work_by_batch_ids(
637 &self.state.session_id,
638 &session_execution_fence,
639 &self.runtime_lease_owner,
640 crate::QueuedWorkClaimBoundary::Idle,
641 crate::QUEUED_WORK_CLAIM_TTL_MS,
642 batch_ids,
643 )
644 .await
645 } else {
646 store
647 .claim_ready_queued_work(
648 &self.state.session_id,
649 &session_execution_fence,
650 &self.runtime_lease_owner,
651 crate::QueuedWorkClaimBoundary::Idle,
652 crate::QUEUED_WORK_CLAIM_TTL_MS,
653 64,
654 )
655 .await
656 }
657 .map_err(super::runtime_error_from_store_commit)?;
658 let Some(claim) = claim else {
659 session_execution_lease
660 .release_if_live()
661 .await
662 .map_err(|err| {
663 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
664 })?;
665 return Ok(None);
666 };
667 let mut work = claim.materialize_for_turn();
668 let turn_id = work
669 .input
670 .trace_turn_id
671 .clone()
672 .or_else(|| Some(opts.execution_scope_id().to_owned()))
673 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
674 work.input.trace_turn_id = Some(turn_id.clone());
675 let causes = work.turn_causes.clone();
676 emit_queued_work_started_to_sink(
677 opts.turn_events_or_noop(),
678 crate::QueuedWorkClaimBoundary::Idle,
679 &claim,
680 causes.clone(),
681 )
682 .await;
683 crate::trace::emit_trace(
684 &self.host.core.tracing.trace_sink,
685 &self.host.core.tracing.trace_context,
686 lash_trace::TraceContext::default()
687 .for_session(self.state.session_id.clone())
688 .for_turn_index(self.state.turn_index + 1)
689 .for_turn(turn_id.clone()),
690 lash_trace::TraceEvent::Custom {
691 name: "queued_work.claimed".to_string(),
692 payload: queued_work_trace_payload(
693 crate::QueuedWorkClaimBoundary::Idle,
694 &claim,
695 &causes,
696 ),
697 },
698 self.host.core.clock.as_ref(),
699 );
700 let claim_for_abandon = claim.clone();
701 let scoped_effect_controller = opts.scoped_effect_controller();
702 let result = self
703 .stream_turn_with_scoped_effect_controller_inner(
704 work.input,
705 opts.events_or_noop(),
706 opts.turn_events_or_noop(),
707 scoped_effect_controller,
708 cancel,
709 Some(claim),
710 Some(&session_execution_lease),
711 SessionExecutionLeaseReleasePolicy::FinalCommit,
712 )
713 .await
714 .map(Some);
715 if let Err(err) = &result {
716 self.abandon_queued_work_claims_after_lease_loss(
717 err,
718 std::slice::from_ref(&claim_for_abandon),
719 )
720 .await;
721 }
722 self.settle_session_execution_lease(Some(&session_execution_lease), result)
723 .await
724 }
725
726 fn ensure_durable_store_facets_for_scope(
734 &self,
735 scoped_effect_controller: &ScopedEffectController<'_>,
736 ) -> Result<(), RuntimeError> {
737 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
738 {
739 return Ok(());
740 }
741 if self
742 .host
743 .core
744 .durability
745 .attachment_store
746 .persistence()
747 .durability_tier()
748 != crate::DurabilityTier::Durable
749 {
750 return Err(RuntimeError::durable_store_required(
751 crate::DurableStoreFacet::AttachmentStore,
752 ));
753 }
754 if self
755 .host
756 .core
757 .durability
758 .process_env_store
759 .durability_tier()
760 != crate::DurabilityTier::Durable
761 {
762 return Err(RuntimeError::durable_store_required(
763 crate::DurableStoreFacet::ProcessEnvStore,
764 ));
765 }
766 if let Some(store) = self
767 .session
768 .as_ref()
769 .and_then(|session| session.history_store())
770 && store.durability_tier() != crate::DurabilityTier::Durable
771 {
772 return Err(RuntimeError::durable_store_required(
773 crate::DurableStoreFacet::SessionStore,
774 ));
775 }
776 if let Some(process_registry) = self.host.process_registry.as_ref()
777 && process_registry.durability_tier() != crate::DurabilityTier::Durable
778 {
779 return Err(RuntimeError::durable_store_required(
780 crate::DurableStoreFacet::ProcessRegistry,
781 ));
782 }
783 Ok(())
784 }
785
786 #[allow(clippy::too_many_arguments)]
787 async fn stream_turn_with_scoped_effect_controller_inner(
788 &mut self,
789 mut input: TurnInput,
790 events: &dyn EventSink,
791 turn_events: &dyn TurnActivitySink,
792 scoped_effect_controller: ScopedEffectController<'_>,
793 cancel: CancellationToken,
794 queued_claim: Option<crate::QueuedWorkClaim>,
795 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
796 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
797 ) -> Result<AssembledTurn, RuntimeError> {
798 if queued_claim.is_none() {
799 if let Some(lease) = session_execution_lease {
800 while self
801 .drain_next_session_command(&lease.fence())
802 .await?
803 .is_some()
804 {}
805 } else if self
806 .session
807 .as_ref()
808 .and_then(|session| session.history_store())
809 .is_some()
810 {
811 return Err(RuntimeError::new(
812 RuntimeErrorCode::StoreCommitFailed,
813 "session command drain requires a session execution lease",
814 ));
815 }
816 }
817 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
818 && scoped_effect_controller
819 .execution_scope()
820 .validates_turn_trace_id()
821 && input_turn_id != scoped_effect_controller.scope_id()
822 {
823 return Err(RuntimeError::new(
824 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
825 format!(
826 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
827 scoped_effect_controller.scope_id()
828 ),
829 ));
830 }
831 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
832 input
833 .trace_turn_id
834 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
835 self.stream_turn_inner(
836 input.clone(),
837 events,
838 turn_events,
839 scoped_effect_controller,
840 cancel.clone(),
841 queued_claim,
842 session_execution_lease,
843 session_execution_lease_release_policy,
844 )
845 .await
846 }
847
848 pub async fn stream_turn_with_agent_frames(
857 &mut self,
858 input: TurnInput,
859 opts: TurnOptions<'_>,
860 ) -> Result<AgentFrameRun, RuntimeError> {
861 let cancel = opts.cancel.clone();
862 let session_execution_lease = self
863 .claim_session_execution_lease(cancel.clone(), true)
864 .await?;
865 let scoped_effect_controller = opts.scoped_effect_controller();
866 let result = self
867 .stream_turn_with_agent_frames_inner(
868 input,
869 opts.events_or_noop(),
870 opts.turn_events_or_noop(),
871 scoped_effect_controller,
872 cancel,
873 session_execution_lease.as_ref(),
874 )
875 .await;
876 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
877 .await
878 }
879
880 async fn stream_turn_with_agent_frames_inner(
881 &mut self,
882 mut input: TurnInput,
883 events: &dyn EventSink,
884 turn_events: &dyn TurnActivitySink,
885 scoped_effect_controller: ScopedEffectController<'_>,
886 cancel: CancellationToken,
887 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
888 ) -> Result<AgentFrameRun, RuntimeError> {
889 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
890 && scoped_effect_controller
891 .execution_scope()
892 .validates_turn_trace_id()
893 && input_turn_id != scoped_effect_controller.scope_id()
894 {
895 return Err(RuntimeError::new(
896 RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
897 format!(
898 "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
899 scoped_effect_controller.scope_id()
900 ),
901 ));
902 }
903 let follow_protocol_turn_options = input.protocol_turn_options.clone();
904 let follow_turn_context = input.turn_context.clone();
905 let follow_trace_turn_id = input
906 .trace_turn_id
907 .clone()
908 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
909 input
910 .trace_turn_id
911 .get_or_insert(follow_trace_turn_id.clone());
912 let mut turns = Vec::new();
913 loop {
914 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
915 input.trace_turn_id = Some(turn_trace_turn_id.clone());
916 let turn_effect_controller = if turns.is_empty() {
917 scoped_effect_controller.clone()
918 } else {
919 ScopedEffectController::borrowed(
920 scoped_effect_controller.controller(),
921 ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
922 )?
923 };
924 let turn = self
925 .stream_turn_with_scoped_effect_controller_inner(
926 input,
927 events,
928 turn_events,
929 turn_effect_controller,
930 cancel.clone(),
931 None,
932 session_execution_lease,
933 SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
934 )
935 .await?;
936 let switched_frame = match &turn.outcome {
937 TurnOutcome::AgentFrameSwitch { frame_id, task } => {
938 Some((frame_id.clone(), task.clone()))
939 }
940 _ => None,
941 };
942 turns.push(turn);
943
944 let Some((_frame_id, task)) = switched_frame else {
945 return Ok(AgentFrameRun { turns });
946 };
947 input = turn_input_from_text(task);
948 input.protocol_turn_options = follow_protocol_turn_options.clone();
949 input.turn_context = follow_turn_context.clone();
950 }
951 }
952
953 #[allow(clippy::too_many_arguments)]
954 async fn stream_turn_inner(
955 &mut self,
956 mut input: TurnInput,
957 events: &dyn EventSink,
958 turn_events: &dyn TurnActivitySink,
959 scoped_effect_controller: ScopedEffectController<'_>,
960 cancel: CancellationToken,
961 queued_claim: Option<crate::QueuedWorkClaim>,
962 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
963 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
964 ) -> Result<AssembledTurn, RuntimeError> {
965 self.refresh_session_graph_from_store()
966 .await
967 .map_err(session_head_refresh_error)?;
968 let input_trace_turn_id = input.trace_turn_id.clone();
969 let queued_turn_work = queued_claim
970 .as_ref()
971 .map(crate::QueuedWorkClaim::materialize_for_turn);
972 if let Some(work) = queued_turn_work.as_ref()
973 && input.items.is_empty()
974 && input.image_blobs.is_empty()
975 {
976 input = work.input.clone();
977 if input.trace_turn_id.is_none() {
978 input.trace_turn_id = input_trace_turn_id;
979 }
980 }
981 if self
982 .session
983 .as_ref()
984 .and_then(|session| session.history_store())
985 .is_some()
986 {
987 ensure_durable_effect_input(&input)?;
988 }
989 if let Some(extension) = &input.protocol_extension
990 && let Some(session) = self.session.as_ref()
991 {
992 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
993 protocol_session
994 .validate_turn_extension(extension)
995 .await
996 .map_err(|err| {
997 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
998 })?;
999 }
1000 let previous_prompt_usage = self.state.last_prompt_usage.clone();
1001 let normalized = match self
1002 .normalize_input_items(&input.items, &input.image_blobs)
1003 .await
1004 {
1005 Ok(items) => items,
1006 Err(e) => {
1007 self.state.last_prompt_usage = None;
1008 let mut assembler = TurnAssembler::default();
1009 let error_event = SessionEvent::Error {
1010 message: e.clone(),
1011 envelope: Some(crate::session_model::ErrorEnvelope {
1012 kind: "input_validation".to_string(),
1013 code: Some("invalid_turn_input".to_string()),
1014 terminal_reason: None,
1015 user_message: e.clone(),
1016 raw: None,
1017 }),
1018 };
1019 assembler.push(&error_event);
1020 emit_turn_activity_to_sink(
1021 turn_events,
1022 TurnActivity::independent(TurnEvent::Error { message: e }),
1023 )
1024 .await;
1025 emit_session_event_to_sink(events, error_event).await;
1026 let outcome_event = SessionEvent::TurnOutcome {
1027 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1028 };
1029 assembler.push(&outcome_event);
1030 emit_session_event_to_sink(events, outcome_event).await;
1031 assembler.push(&SessionEvent::Done);
1032 emit_session_event_to_sink(events, SessionEvent::Done).await;
1033 return Ok(assembler.finish(
1034 self.state.to_snapshot(),
1035 false,
1036 None,
1037 &self.host.core.control.termination,
1038 ));
1039 }
1040 };
1041 let turn_index = self.state.turn_index + 1;
1042 let trace_turn_id = input
1043 .trace_turn_id
1044 .clone()
1045 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1046 if self.host.core.tracing.trace_sink.is_some() {
1047 let mut trace_metadata = std::collections::BTreeMap::new();
1048 trace_metadata.insert(
1049 "input_item_count".to_string(),
1050 serde_json::json!(normalized.len()),
1051 );
1052 crate::trace::emit_trace(
1053 &self.host.core.tracing.trace_sink,
1054 &self.host.core.tracing.trace_context,
1055 lash_trace::TraceContext::default()
1056 .for_session(self.state.session_id.clone())
1057 .for_turn_index(turn_index)
1058 .for_turn(trace_turn_id.clone()),
1059 lash_trace::TraceEvent::TurnStarted {
1060 metadata: trace_metadata,
1061 },
1062 self.host.core.clock.as_ref(),
1063 );
1064 }
1065
1066 let base_read_model = self.state.read_model();
1067 let base_messages = base_read_model.messages;
1068 let base_render_cache = base_read_model.prompt_render_cache;
1069 let mut turn_delta = Vec::new();
1070 let initial_turn_causes = queued_turn_work
1071 .as_ref()
1072 .map(|work| work.turn_causes.clone())
1073 .unwrap_or_default();
1074 turn_delta.extend(
1075 initial_turn_causes
1076 .iter()
1077 .map(crate::TurnCause::to_event_message),
1078 );
1079
1080 let user_id = fresh_message_id();
1081 let mut user_parts: Vec<Part> = Vec::new();
1082 for item in normalized {
1083 match item {
1084 NormalizedItem::Text(text) => {
1085 if text.is_empty() {
1086 continue;
1087 }
1088 user_parts.push(Part {
1089 id: format!("{}.p{}", user_id, user_parts.len()),
1090 kind: PartKind::Text,
1091 content: text,
1092 attachment: None,
1093 tool_call_id: None,
1094 tool_name: None,
1095 tool_replay: None,
1096 prune_state: PruneState::Intact,
1097 reasoning_meta: None,
1098 response_meta: None,
1099 });
1100 }
1101 NormalizedItem::Image(reference) => {
1102 user_parts.push(Part {
1103 id: format!("{}.p{}", user_id, user_parts.len()),
1104 kind: PartKind::Image,
1105 content: String::new(),
1106 attachment: Some(crate::session_model::message::PartAttachment {
1107 reference,
1108 }),
1109 tool_call_id: None,
1110 tool_name: None,
1111 tool_replay: None,
1112 prune_state: PruneState::Intact,
1113 reasoning_meta: None,
1114 response_meta: None,
1115 });
1116 }
1117 }
1118 }
1119 if user_parts.is_empty() && initial_turn_causes.is_empty() {
1120 user_parts.push(Part {
1121 id: format!("{}.p0", user_id),
1122 kind: PartKind::Text,
1123 content: String::new(),
1124 attachment: None,
1125 tool_call_id: None,
1126 tool_name: None,
1127 tool_replay: None,
1128 prune_state: PruneState::Intact,
1129 reasoning_meta: None,
1130 response_meta: None,
1131 });
1132 }
1133 if !user_parts.is_empty() {
1134 reassign_part_ids(&user_id, &mut user_parts);
1135 turn_delta.push(Message {
1136 id: user_id.clone(),
1137 role: MessageRole::User,
1138 parts: shared_parts(user_parts),
1139 origin: None,
1140 });
1141 }
1142
1143 let manager = self
1144 .runtime_session_services_for_turn(None)
1145 .map_err(|err| {
1146 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1147 })?;
1148 let plugin_session = self
1149 .session
1150 .as_ref()
1151 .map(|s| Arc::clone(s.plugins()))
1152 .ok_or_else(|| {
1153 RuntimeError::new(
1154 RuntimeErrorCode::ContextPrepareTurn,
1155 "runtime session not available",
1156 )
1157 })?;
1158 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1159 let prepare_phase_controller = scoped_child_turn_controller(
1160 &scoped_effect_controller,
1161 &self.state.session_id,
1162 &prepare_phase_turn_id,
1163 )?;
1164 let turn_ctx = crate::TurnTransformContext {
1165 session_id: self.state.session_id.clone(),
1166 state: self.read_view(),
1167 prompt_usage: previous_prompt_usage.clone(),
1168 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1169 sessions: manager.state_service(),
1170 session_lifecycle: manager.lifecycle_service(),
1171 session_graph: manager.graph_service(),
1172 scoped_effect_controller: scoped_effect_controller.clone(),
1173 direct_completions: manager.direct_completion_client(
1174 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1175 Some(prepare_phase_turn_id),
1176 ),
1177 };
1178 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1179 let prepared_context = plugin_session
1180 .prepare_turn_context(
1181 &turn_ctx,
1182 crate::session_model::context::PreparedContext {
1183 messages: crate::MessageSequence::from_base_and_delta(
1184 base_messages,
1185 turn_delta,
1186 )
1187 .with_base_render_cache(base_render_cache),
1188 ..Default::default()
1189 },
1190 self.turn_phase_probe.clone(),
1191 )
1192 .await
1193 .map_err(|err| {
1194 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1195 })?;
1196 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1197 drop(turn_ctx);
1202 let messages = prepared_context.messages;
1203 if let Some(session) = self.session.as_mut() {
1204 session
1205 .set_context_overlay(
1206 prepared_context.tool_providers,
1207 prepared_context.prompt_contributions,
1208 prepared_context.include_base_tools,
1209 )
1210 .map_err(|err| {
1211 RuntimeError::new(
1212 RuntimeErrorCode::Other("session_tool_registry".to_string()),
1213 err.to_string(),
1214 )
1215 })?;
1216 }
1217
1218 self.state.last_prompt_usage = None;
1219 Box::pin(self.stream_prepared_turn_inner(
1220 messages,
1221 previous_prompt_usage,
1222 input.protocol_turn_options.clone(),
1223 input.protocol_extension.clone(),
1224 input.turn_context.clone(),
1225 initial_turn_causes,
1226 trace_turn_id,
1227 turn_index,
1228 events,
1229 turn_events,
1230 scoped_effect_controller,
1231 cancel,
1232 queued_claim,
1233 session_execution_lease,
1234 session_execution_lease_release_policy,
1235 ))
1236 .await
1237 }
1238
1239 pub async fn run_turn_assembled(
1241 &mut self,
1242 input: TurnInput,
1243 cancel: CancellationToken,
1244 scoped_effect_controller: ScopedEffectController<'_>,
1245 ) -> Result<AssembledTurn, RuntimeError> {
1246 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1247 .await
1248 }
1249
1250 #[allow(clippy::too_many_arguments)]
1252 pub async fn stream_prepared_turn(
1253 &mut self,
1254 messages: crate::MessageSequence,
1255 previous_prompt_usage: Option<PromptUsage>,
1256 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1257 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1258 turn_context: crate::TurnContext,
1259 initial_turn_causes: Vec<crate::TurnCause>,
1260 trace_turn_id: String,
1261 turn_index: usize,
1262 events: &dyn EventSink,
1263 turn_events: &dyn TurnActivitySink,
1264 scoped_effect_controller: ScopedEffectController<'_>,
1265 cancel: CancellationToken,
1266 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1267 ) -> Result<AssembledTurn, RuntimeError> {
1268 let session_execution_lease = self
1269 .claim_session_execution_lease(cancel.clone(), true)
1270 .await?;
1271 let result = self
1272 .stream_prepared_turn_inner(
1273 messages,
1274 previous_prompt_usage,
1275 protocol_turn_options,
1276 protocol_extension,
1277 turn_context,
1278 initial_turn_causes,
1279 trace_turn_id,
1280 turn_index,
1281 events,
1282 turn_events,
1283 scoped_effect_controller,
1284 cancel,
1285 initial_queue_claim,
1286 session_execution_lease.as_ref(),
1287 SessionExecutionLeaseReleasePolicy::FinalCommit,
1288 )
1289 .await;
1290 self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1291 .await
1292 }
1293
1294 #[allow(clippy::too_many_arguments)]
1295 async fn stream_prepared_turn_inner(
1296 &mut self,
1297 messages: crate::MessageSequence,
1298 _previous_prompt_usage: Option<PromptUsage>,
1299 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1300 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1301 turn_context: crate::TurnContext,
1302 initial_turn_causes: Vec<crate::TurnCause>,
1303 trace_turn_id: String,
1304 turn_index: usize,
1305 events: &dyn EventSink,
1306 turn_events: &dyn TurnActivitySink,
1307 scoped_effect_controller: ScopedEffectController<'_>,
1308 cancel: CancellationToken,
1309 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1310 session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1311 session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1312 ) -> Result<AssembledTurn, RuntimeError> {
1313 if session_execution_lease.is_none()
1314 && self
1315 .session
1316 .as_ref()
1317 .and_then(|session| session.history_store())
1318 .is_some()
1319 {
1320 return Err(RuntimeError::new(
1321 RuntimeErrorCode::StoreCommitFailed,
1322 "prepared turn requires a session execution lease",
1323 ));
1324 }
1325 let session_execution_fence =
1326 session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1327 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1328 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1329 let mut turn_policy = self.state.effective_policy().clone();
1330 let turn_provider_override = turn_context.provider().cloned();
1331 if let Some(provider) = turn_provider_override.as_ref() {
1332 turn_policy.provider_id = provider.kind().to_string();
1333 }
1334 if let Some(model) = turn_context.model_spec() {
1335 turn_policy.model = model.clone();
1336 }
1337 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1338 let effective_protocol_turn_options = protocol_turn_options
1339 .clone()
1340 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1341 .unwrap_or(session_protocol_turn_options);
1342 let manager = self
1343 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1344 .map_err(|err| {
1345 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1346 })?;
1347 let plugins = {
1348 let session = self
1349 .session
1350 .as_ref()
1351 .expect("lash runtime session must be available");
1352 Arc::clone(session.plugins())
1353 };
1354 let mut assembler = TurnAssembler::new();
1355 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1356 let prepared = {
1362 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1363 PrepareTurnRequest {
1364 session_id: self.state.session_id.clone(),
1365 state: crate::SessionReadView::from_runtime_state(
1366 &self.state,
1367 turn_policy.clone(),
1368 effective_protocol_turn_options.clone(),
1369 ),
1370 messages,
1371 sessions: manager.state_service(),
1372 session_lifecycle: manager.lifecycle_service(),
1373 session_graph: manager.graph_service(),
1374 turn_context: turn_context.clone(),
1375 },
1376 self.turn_phase_probe.clone(),
1377 );
1378 let mut prepare_turn = Box::pin(prepare_turn);
1379
1380 loop {
1381 tokio::select! {
1382 prepared = prepare_turn.as_mut() => {
1383 let prepared = prepared.map_err(|err| {
1384 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1385 })?;
1386 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1387 break prepared;
1388 }
1389 maybe_event = event_rx.recv() => {
1390 if let Some(event) = maybe_event {
1391 emit_runtime_stream_event_to_sinks(
1392 events,
1393 turn_events,
1394 event,
1395 &mut assembler,
1396 )
1397 .await;
1398 }
1399 }
1400 }
1401 }
1402 };
1403 for event in &prepared.events {
1404 assembler.push(event);
1405 }
1406 emit_session_events_to_sink(events, prepared.events).await;
1407 if let Some(abort) = prepared.abort {
1408 drop(event_tx);
1409
1410 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1411 self.state.clone(),
1412 Arc::clone(&self.host.core.clock),
1413 );
1414 turn_pipeline.apply_prepared_messages(&prepared.messages);
1415 let state = turn_pipeline.into_final_state();
1416 let issue = TurnIssue {
1417 kind: "plugin".to_string(),
1418 code: Some(abort.code),
1419 terminal_reason: None,
1420 message: abort.message.clone(),
1421 raw: None,
1422 };
1423 let error_event = SessionEvent::Error {
1424 message: abort.message,
1425 envelope: Some(crate::session_model::ErrorEnvelope {
1426 kind: "plugin".to_string(),
1427 code: issue.code.clone(),
1428 terminal_reason: None,
1429 user_message: issue.message.clone(),
1430 raw: None,
1431 }),
1432 };
1433 assembler.push(&error_event);
1434 emit_turn_activity_to_sink(
1435 turn_events,
1436 TurnActivity::independent(TurnEvent::Error {
1437 message: issue.message.clone(),
1438 }),
1439 )
1440 .await;
1441 emit_session_event_to_sink(events, error_event).await;
1442 let outcome_event = SessionEvent::TurnOutcome {
1443 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1444 };
1445 assembler.push(&outcome_event);
1446 emit_session_event_to_sink(events, outcome_event).await;
1447 assembler.push(&SessionEvent::Done);
1448 emit_session_event_to_sink(events, SessionEvent::Done).await;
1449 return Ok(assembler.finish(
1450 state.to_snapshot(),
1451 cancel.is_cancelled(),
1452 Some(issue),
1453 &self.host.core.control.termination,
1454 ));
1455 }
1456 let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1457 self.state.clone(),
1458 Arc::clone(&self.host.core.clock),
1459 )
1460 .with_session_execution_lease(session_execution_fence.clone());
1461 let store = self
1462 .session
1463 .as_ref()
1464 .and_then(|session| session.history_store());
1465 let progress_store = if scoped_effect_controller.controller().durability_tier()
1469 == crate::DurabilityTier::Durable
1470 {
1471 None
1472 } else {
1473 store.as_ref().map(|store| store.as_ref())
1474 };
1475 turn_pipeline
1476 .prepared_checkpoint(
1477 progress_store,
1478 turn_policy.clone(),
1479 turn_index,
1480 &prepared.messages,
1481 self.session.as_mut(),
1482 )
1483 .await
1484 .map_err(super::runtime_error_from_store_commit)?;
1485 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1486 RuntimeSessionPolicy::from_provider(
1487 turn_policy.clone(),
1488 provider.with_clock(Arc::clone(&self.host.core.clock)),
1489 )
1490 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1491 } else {
1492 self.host
1493 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1494 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1495 };
1496 let manager = self
1497 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1498 .map_err(|err| {
1499 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1500 })?;
1501 let cancel_state = cancel.clone();
1502 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1503 let session = self
1504 .session
1505 .take()
1506 .expect("lash runtime session must be available");
1507 let mut driver = RuntimeTurnDriver {
1508 session,
1509 policy: resolved_turn_policy,
1510 host: self.host.clone(),
1511 turn_id: scoped_effect_controller.scope_id().to_string(),
1512 scoped_effect_controller,
1513 session_id: self.state.session_id.clone(),
1514 turn_index,
1515 turn_pipeline,
1516 llm_stream_summaries: HashMap::new(),
1517 next_llm_ordinal: 0,
1518 session_services: manager,
1519 protocol_turn_options: effective_protocol_turn_options,
1520 protocol_extension,
1521 turn_context,
1522 turn_causes: initial_turn_causes,
1523 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1524 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1525 session_execution_lease: session_execution_fence,
1526 runtime_lease_owner: self.runtime_lease_owner.clone(),
1527 turn_phase_probe: self.turn_phase_probe.clone(),
1528 };
1529 let protocol_run_offset = 0;
1530 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1531 let run_result = drive_turn_to_completion(
1532 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1533 &mut event_rx,
1534 &mut assembler,
1535 &child_usage_event_relay,
1536 events,
1537 turn_events,
1538 )
1539 .await;
1540 let (new_messages, _new_protocol_iteration) = match run_result {
1541 Ok(result) => result,
1542 Err(err) => {
1543 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1544 let RuntimeTurnDriver {
1545 session,
1546 pending_queue_claims,
1547 ..
1548 } = driver;
1549 self.session = Some(session);
1550 self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1551 .await;
1552 return Err(err);
1553 }
1554 };
1555 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1556 tracing::debug!(
1557 new_message_count = new_messages.len(),
1558 tool_call_count = assembler.tool_calls.len(),
1559 "runtime post-run_task"
1560 );
1561
1562 let RuntimeTurnDriver {
1563 session,
1564 policy,
1565 turn_pipeline,
1566 pending_queue_claims,
1567 ..
1568 } = driver;
1569 self.session = Some(session);
1570 let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1571 let finish_result = self
1572 .finish_turn(
1573 TurnFinishInput {
1574 turn_pipeline,
1575 assembler,
1576 new_messages,
1577 policy,
1578 turn_index,
1579 queued_work_completions: pending_queue_claims
1580 .iter()
1581 .map(crate::QueuedWorkClaim::completion)
1582 .collect(),
1583 trace_turn_id,
1584 },
1585 events,
1586 &finish_scoped_effect_controller,
1587 &cancel_state,
1588 session_execution_lease,
1589 session_execution_lease_release_policy,
1590 )
1591 .await;
1592 if let Err(err) = &finish_result {
1593 self.abandon_queued_work_claims_after_lease_loss(
1594 err,
1595 &pending_queue_claims_for_abandon,
1596 )
1597 .await;
1598 }
1599 finish_result
1600 }
1601 async fn normalize_input_items(
1602 &self,
1603 items: &[InputItem],
1604 image_blobs: &HashMap<String, Vec<u8>>,
1605 ) -> Result<Vec<NormalizedItem>, String> {
1606 normalize_input_items(
1607 items,
1608 image_blobs,
1609 self.host.core.durability.attachment_store.as_ref(),
1610 )
1611 .await
1612 }
1613}
1614
1615fn turn_input_from_text(text: String) -> TurnInput {
1616 TurnInput {
1617 items: vec![InputItem::Text { text }],
1618 image_blobs: HashMap::new(),
1619 protocol_turn_options: None,
1620 trace_turn_id: None,
1621 protocol_extension: None,
1622 turn_context: crate::TurnContext::default(),
1623 }
1624}
1625
1626fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1627 if completed_turn_count == 0 {
1628 root_turn_id.to_string()
1629 } else {
1630 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1631 }
1632}
1633
1634pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1635 if input.protocol_extension.is_some() {
1636 return Err(RuntimeError::new(
1637 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1638 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1639 ));
1640 }
1641 input
1642 .turn_context
1643 .live_plugin_inputs()
1644 .durable_effect_rejection()?;
1645 Ok(())
1646}
1647
1648async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1649 if !events.is_noop() {
1650 events.emit(activity).await;
1651 }
1652}
1653
1654async fn drive_turn_to_completion<F>(
1664 run_future: F,
1665 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1666 assembler: &mut TurnAssembler,
1667 child_usage_event_relay: &ChildUsageEventRelay,
1668 events: &dyn EventSink,
1669 turn_events: &dyn TurnActivitySink,
1670) -> Result<(crate::MessageSequence, usize), RuntimeError>
1671where
1672 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1673{
1674 let run_result = {
1675 let mut run_future = Box::pin(run_future);
1676 loop {
1677 tokio::select! {
1678 maybe_event = event_rx.recv() => {
1679 if let Some(event) = maybe_event {
1680 emit_runtime_stream_event_to_sinks(
1681 events,
1682 turn_events,
1683 event,
1684 assembler,
1685 )
1686 .await;
1687 }
1688 }
1689 completed = run_future.as_mut() => {
1690 child_usage_event_relay.clear();
1691 break completed;
1692 }
1693 }
1694 }
1695 };
1696 while let Some(event) = event_rx.recv().await {
1697 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1698 }
1699 run_result
1700}
1701
1702async fn emit_runtime_stream_event_to_sinks(
1703 events: &dyn EventSink,
1704 turn_events: &dyn TurnActivitySink,
1705 event: RuntimeStreamEvent,
1706 assembler: &mut TurnAssembler,
1707) {
1708 match event {
1709 RuntimeStreamEvent::Session(event) => {
1710 assembler.push(&event);
1711 emit_session_event_to_sink(events, event).await;
1712 }
1713 RuntimeStreamEvent::Turn(activity) => {
1714 assembler.push_turn_activity(&activity);
1715 emit_turn_activity_to_sink(turn_events, activity).await;
1716 }
1717 }
1718}
1719
1720#[cfg(test)]
1721mod tests {
1722 use super::agent_frame_follow_turn_id;
1723
1724 #[test]
1725 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1726 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1727 assert_eq!(
1728 agent_frame_follow_turn_id("root-turn", 1),
1729 "root-turn:agent-frame:1"
1730 );
1731 assert_eq!(
1732 agent_frame_follow_turn_id("root-turn", 2),
1733 "root-turn:agent-frame:2"
1734 );
1735 }
1736}