1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52 match payload {
53 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
56 }
57}
58
59fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
60 claim
61 .batches
62 .iter()
63 .map(|batch| batch.batch_id.clone())
64 .collect()
65}
66
67fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
68 format!("{parent_turn_id}:{phase}")
69}
70
71fn scoped_child_turn_controller<'run>(
72 scoped_effect_controller: &'run ScopedEffectController<'_>,
73 session_id: &str,
74 turn_id: &str,
75) -> Result<ScopedEffectController<'run>, RuntimeError> {
76 ScopedEffectController::borrowed(
77 scoped_effect_controller.controller(),
78 EffectScope::turn(session_id, turn_id),
79 )
80}
81
82pub(in crate::runtime) fn queued_work_trace_payload(
83 boundary: crate::QueuedWorkClaimBoundary,
84 claim: &crate::QueuedWorkClaim,
85 causes: &[crate::TurnCause],
86) -> serde_json::Value {
87 serde_json::json!({
88 "boundary": boundary,
89 "claim_id": claim.claim_id,
90 "owner_id": claim.owner_id,
91 "batch_ids": queued_work_batch_ids(claim),
92 "payload_types": claim.batches.iter()
93 .flat_map(|batch| batch.items.iter())
94 .map(|item| queued_work_payload_type(&item.payload))
95 .collect::<Vec<_>>(),
96 "causes": causes,
97 })
98}
99
100pub(in crate::runtime) fn queued_work_completion_trace_payload(
101 completions: &[crate::QueuedWorkCompletion],
102) -> serde_json::Value {
103 serde_json::json!({
104 "claims": completions.iter().map(|completion| {
105 serde_json::json!({
106 "session_id": completion.session_id,
107 "claim_id": completion.claim_id,
108 "batch_ids": completion.batch_ids,
109 })
110 }).collect::<Vec<_>>(),
111 })
112}
113
114async fn emit_queued_work_started_to_sink(
115 events: &dyn TurnActivitySink,
116 boundary: crate::QueuedWorkClaimBoundary,
117 claim: &crate::QueuedWorkClaim,
118 causes: Vec<crate::TurnCause>,
119) {
120 emit_turn_activity_to_sink(
121 events,
122 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
123 boundary,
124 batch_ids: queued_work_batch_ids(claim),
125 causes,
126 }),
127 )
128 .await;
129}
130
131pub(in crate::runtime) async fn send_queued_work_started_event(
132 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
133 boundary: crate::QueuedWorkClaimBoundary,
134 claim: &crate::QueuedWorkClaim,
135 causes: Vec<crate::TurnCause>,
136) {
137 send_turn_activity(
138 event_tx,
139 TurnActivityId::fresh(),
140 TurnEvent::QueuedWorkStarted {
141 boundary,
142 batch_ids: queued_work_batch_ids(claim),
143 causes,
144 },
145 )
146 .await;
147}
148
149struct TurnFinishInput {
150 turn_pipeline: TurnBoundary,
151 assembler: TurnAssembler,
152 new_messages: crate::MessageSequence,
153 policy: RuntimeSessionPolicy,
154 turn_index: usize,
155 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
156 trace_turn_id: String,
157}
158
159impl LashRuntime {
160 fn max_context_tokens(&self) -> usize {
161 self.state.effective_policy().context_window_tokens()
162 }
163 #[doc(hidden)]
164 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
165 self.turn_phase_probe = Some(probe);
166 }
167
168 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
169 if let Some(probe) = self.turn_phase_probe.as_ref() {
170 probe.begin(phase);
171 }
172 }
173
174 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
175 if let Some(probe) = self.turn_phase_probe.as_ref() {
176 probe.end(phase);
177 }
178 }
179
180 async fn finish_turn(
181 &mut self,
182 finish: TurnFinishInput,
183 events: &dyn EventSink,
184 scoped_effect_controller: &ScopedEffectController<'_>,
185 cancel_state: &CancellationToken,
186 ) -> Result<AssembledTurn, RuntimeError> {
187 let TurnFinishInput {
188 mut turn_pipeline,
189 assembler,
190 new_messages,
191 policy,
192 turn_index,
193 queued_work_completions,
194 trace_turn_id,
195 } = finish;
196 self.policy = self.state.effective_policy().clone();
197 turn_pipeline.state_mut().policy = self.policy.clone();
198 turn_pipeline.state_mut().turn_index = turn_index;
199
200 let mut turn_usage_delta = {
201 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
202 std::mem::take(&mut *ledger)
203 };
204 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
205 turn_usage_delta.push(TokenLedgerEntry {
206 source: "turn".to_string(),
207 model: policy.model.id.clone(),
208 usage: assembler.token_usage.clone(),
209 });
210 }
211 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
212
213 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
214 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
215 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
216 }
217
218 let last_prompt_usage = assembler
219 .last_llm_usage()
220 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
221 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
222 let assembled_state = turn_pipeline.export_state_for_assembly();
223 let assembled = assembler.finish(
224 assembled_state,
225 cancel_state.is_cancelled(),
226 None,
227 &self.host.core.control.termination,
228 );
229
230 let Some(session) = self.session.as_ref() else {
231 self.state.apply_snapshot(&assembled.state);
232 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
233 return Ok(assembled);
234 };
235
236 let plugins = Arc::clone(session.plugins());
237 let manager = match self.runtime_session_services_for_turn(None) {
238 Ok(manager) => manager,
239 Err(err) => {
240 return Err(RuntimeError::new(
241 RuntimeErrorCode::PluginSessionManager,
242 err.to_string(),
243 ));
244 }
245 };
246
247 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
248 let finalized = match plugins
249 .finalize_turn_with_phase_probe(
250 assembled,
251 manager.state_service(),
252 manager.lifecycle_service(),
253 manager.graph_service(),
254 self.turn_phase_probe.clone(),
255 )
256 .await
257 {
258 Ok(finalized) => finalized,
259 Err(err) => {
260 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
261 return Err(RuntimeError::new(
262 RuntimeErrorCode::PluginFinalizeTurn,
263 err.to_string(),
264 ));
265 }
266 };
267 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
268
269 let mut returned_turn = finalized.turn;
270 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
271 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
272 let queued_work_completion_trace = queued_work_completions.clone();
273 if let Err(err) = turn_pipeline
274 .final_commit(
275 &mut returned_turn,
276 self.session.as_mut(),
277 &turn_usage_delta,
278 Some(&trace_turn_id),
279 queued_work_completions,
280 )
281 .await
282 {
283 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
284 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
285 return Err(err);
286 }
287 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
288
289 emit_session_events_to_sink(events, finalized.events).await;
290 self.state = turn_pipeline.into_final_state();
291 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
292 && let Some(session) = self.session.as_mut()
293 {
294 let protocol_session = Arc::clone(session.plugins().protocol_session());
295 let session_id = self.state.session_id.clone();
296 protocol_session
297 .restore_session(
298 crate::plugin::ProtocolSessionContext::new(session, &session_id),
299 &self.state,
300 )
301 .await
302 .map_err(|err| {
303 RuntimeError::new(
304 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
305 err.to_string(),
306 )
307 })?;
308 }
309 if !queued_work_completion_trace.is_empty() {
310 crate::trace::emit_trace(
311 &self.host.core.tracing.trace_sink,
312 &self.host.core.tracing.trace_context,
313 lash_trace::TraceContext::default()
314 .for_session(returned_turn.state.session_id.clone())
315 .for_turn_index(returned_turn.state.turn_index)
316 .for_turn(trace_turn_id.clone()),
317 lash_trace::TraceEvent::Custom {
318 name: "queued_work.completed".to_string(),
319 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
320 },
321 );
322 }
323 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
324 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
325 .await?;
326 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
327 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
328
329 self.emit_completed_turn_trace(
330 &returned_turn.state,
331 &returned_turn.outcome,
332 &trace_turn_id,
333 );
334 Ok(returned_turn)
335 }
336
337 fn emit_completed_turn_trace(
338 &self,
339 state: &SessionSnapshot,
340 outcome: &TurnOutcome,
341 trace_turn_id: &str,
342 ) {
343 if self.host.core.tracing.trace_sink.is_none() {
344 return;
345 }
346
347 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
348 crate::trace::emit_trace(
349 &self.host.core.tracing.trace_sink,
350 &self.host.core.tracing.trace_context,
351 lash_trace::TraceContext::default()
352 .for_session(state.session_id.clone())
353 .for_turn_index(state.turn_index)
354 .for_turn(trace_turn_id.to_string()),
355 lash_trace::TraceEvent::TurnCompleted {
356 status: status.to_string(),
357 done_reason: done_reason.to_string(),
358 agent_frame_switch,
359 },
360 );
361 }
362
363 async fn emit_turn_persisted_event(
364 &self,
365 returned_turn: &AssembledTurn,
366 scoped_effect_controller: &ScopedEffectController<'_>,
367 trace_turn_id: &str,
368 ) -> Result<(), RuntimeError> {
369 let Some(session) = self.session.as_ref() else {
370 return Ok(());
371 };
372 let Ok(manager) = self.runtime_session_services() else {
373 return Ok(());
374 };
375 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
376 let phase_controller = scoped_child_turn_controller(
377 scoped_effect_controller,
378 &self.state.session_id,
379 &phase_turn_id,
380 )?;
381 let direct_completions = manager.direct_completion_client(
382 RuntimeEffectControllerHandle::borrowed(phase_controller),
383 Some(phase_turn_id),
384 );
385
386 session
387 .plugins()
388 .emit_runtime_event_with_phase_probe(
389 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
390 crate::SessionStateChangedContext {
391 session_id: self.state.session_id.clone(),
392 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
393 sessions: manager.state_service(),
394 session_graph: manager.graph_service(),
395 direct_completions,
396 },
397 )),
398 self.turn_phase_probe.clone(),
399 )
400 .await;
401 Ok(())
402 }
403
404 pub async fn stream_turn(
406 &mut self,
407 input: TurnInput,
408 opts: TurnOptions<'_>,
409 ) -> Result<AssembledTurn, RuntimeError> {
410 let cancel = opts.cancel.clone();
411 let scoped_effect_controller = opts.scoped_effect_controller();
412 self.stream_turn_with_scoped_effect_controller_inner(
413 input,
414 opts.events_or_noop(),
415 opts.turn_events_or_noop(),
416 scoped_effect_controller,
417 cancel,
418 None,
419 )
420 .await
421 }
422
423 pub async fn stream_next_queued_work(
424 &mut self,
425 opts: TurnOptions<'_>,
426 ) -> Result<Option<AssembledTurn>, RuntimeError> {
427 self.stream_queued_work(opts, None).await
428 }
429
430 pub async fn stream_selected_queued_work(
431 &mut self,
432 opts: TurnOptions<'_>,
433 batch_ids: &[String],
434 ) -> Result<Option<AssembledTurn>, RuntimeError> {
435 self.stream_queued_work(opts, Some(batch_ids)).await
436 }
437
438 async fn stream_queued_work(
439 &mut self,
440 opts: TurnOptions<'_>,
441 selected_batch_ids: Option<&[String]>,
442 ) -> Result<Option<AssembledTurn>, RuntimeError> {
443 if self.drain_next_session_command().await?.is_some() {
444 return Ok(None);
445 }
446 let Some(store) = self
447 .session
448 .as_ref()
449 .and_then(|session| session.history_store())
450 else {
451 return Ok(None);
452 };
453 let claim = if let Some(batch_ids) = selected_batch_ids {
454 store
455 .claim_ready_queued_work_by_batch_ids(
456 &self.state.session_id,
457 &self.runtime_scope_id,
458 crate::QueuedWorkClaimBoundary::Idle,
459 crate::QUEUED_WORK_CLAIM_TTL_MS,
460 batch_ids,
461 )
462 .await
463 } else {
464 store
465 .claim_ready_queued_work(
466 &self.state.session_id,
467 &self.runtime_scope_id,
468 crate::QueuedWorkClaimBoundary::Idle,
469 crate::QUEUED_WORK_CLAIM_TTL_MS,
470 64,
471 )
472 .await
473 }
474 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
475 let Some(claim) = claim else {
476 return Ok(None);
477 };
478 let mut work = claim.materialize_for_turn();
479 let turn_id = work
480 .input
481 .trace_turn_id
482 .clone()
483 .or_else(|| Some(opts.effect_scope_id().to_owned()))
484 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
485 work.input.trace_turn_id = Some(turn_id.clone());
486 let causes = work.turn_causes.clone();
487 emit_queued_work_started_to_sink(
488 opts.turn_events_or_noop(),
489 crate::QueuedWorkClaimBoundary::Idle,
490 &claim,
491 causes.clone(),
492 )
493 .await;
494 crate::trace::emit_trace(
495 &self.host.core.tracing.trace_sink,
496 &self.host.core.tracing.trace_context,
497 lash_trace::TraceContext::default()
498 .for_session(self.state.session_id.clone())
499 .for_turn_index(self.state.turn_index + 1)
500 .for_turn(turn_id.clone()),
501 lash_trace::TraceEvent::Custom {
502 name: "queued_work.claimed".to_string(),
503 payload: queued_work_trace_payload(
504 crate::QueuedWorkClaimBoundary::Idle,
505 &claim,
506 &causes,
507 ),
508 },
509 );
510 let cancel = opts.cancel.clone();
511 let scoped_effect_controller = opts.scoped_effect_controller();
512 self.stream_turn_with_scoped_effect_controller_inner(
513 work.input,
514 opts.events_or_noop(),
515 opts.turn_events_or_noop(),
516 scoped_effect_controller,
517 cancel,
518 Some(claim),
519 )
520 .await
521 .map(Some)
522 }
523
524 fn ensure_durable_store_facets_for_scope(
532 &self,
533 scoped_effect_controller: &ScopedEffectController<'_>,
534 ) -> Result<(), RuntimeError> {
535 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
536 {
537 return Ok(());
538 }
539 if self
540 .host
541 .core
542 .durability
543 .attachment_store
544 .persistence()
545 .durability_tier()
546 != crate::DurabilityTier::Durable
547 {
548 return Err(RuntimeError::durable_store_required(
549 crate::DurableStoreFacet::AttachmentStore,
550 ));
551 }
552 if self
553 .host
554 .core
555 .durability
556 .lashlang_artifact_store
557 .durability_tier()
558 != crate::DurabilityTier::Durable
559 {
560 return Err(RuntimeError::durable_store_required(
561 crate::DurableStoreFacet::ArtifactStore,
562 ));
563 }
564 if let Some(store) = self
565 .session
566 .as_ref()
567 .and_then(|session| session.history_store())
568 && store.durability_tier() != crate::DurabilityTier::Durable
569 {
570 return Err(RuntimeError::durable_store_required(
571 crate::DurableStoreFacet::SessionStore,
572 ));
573 }
574 if let Some(process_registry) = self.host.process_registry.as_ref()
575 && process_registry.durability_tier() != crate::DurabilityTier::Durable
576 {
577 return Err(RuntimeError::durable_store_required(
578 crate::DurableStoreFacet::ProcessRegistry,
579 ));
580 }
581 Ok(())
582 }
583
584 async fn stream_turn_with_scoped_effect_controller_inner(
585 &mut self,
586 mut input: TurnInput,
587 events: &dyn EventSink,
588 turn_events: &dyn TurnActivitySink,
589 scoped_effect_controller: ScopedEffectController<'_>,
590 cancel: CancellationToken,
591 queued_claim: Option<crate::QueuedWorkClaim>,
592 ) -> Result<AssembledTurn, RuntimeError> {
593 if queued_claim.is_none() {
594 while self.drain_next_session_command().await?.is_some() {}
595 }
596 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
597 && scoped_effect_controller
598 .effect_scope()
599 .validates_turn_trace_id()
600 && input_turn_id != scoped_effect_controller.scope_id()
601 {
602 return Err(RuntimeError::new(
603 RuntimeErrorCode::EffectScopeTurnIdMismatch,
604 format!(
605 "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
606 scoped_effect_controller.scope_id()
607 ),
608 ));
609 }
610 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
611 input
612 .trace_turn_id
613 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
614 self.stream_turn_inner(
615 input.clone(),
616 events,
617 turn_events,
618 scoped_effect_controller,
619 cancel.clone(),
620 queued_claim,
621 )
622 .await
623 }
624
625 pub async fn stream_turn_with_agent_frames(
633 &mut self,
634 input: TurnInput,
635 opts: TurnOptions<'_>,
636 ) -> Result<AgentFrameRun, RuntimeError> {
637 let cancel = opts.cancel.clone();
638 let scoped_effect_controller = opts.scoped_effect_controller();
639 self.stream_turn_with_agent_frames_inner(
640 input,
641 opts.events_or_noop(),
642 opts.turn_events_or_noop(),
643 scoped_effect_controller,
644 cancel,
645 )
646 .await
647 }
648
649 async fn stream_turn_with_agent_frames_inner(
650 &mut self,
651 mut input: TurnInput,
652 events: &dyn EventSink,
653 turn_events: &dyn TurnActivitySink,
654 scoped_effect_controller: ScopedEffectController<'_>,
655 cancel: CancellationToken,
656 ) -> Result<AgentFrameRun, RuntimeError> {
657 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
658 && scoped_effect_controller
659 .effect_scope()
660 .validates_turn_trace_id()
661 && input_turn_id != scoped_effect_controller.scope_id()
662 {
663 return Err(RuntimeError::new(
664 RuntimeErrorCode::EffectScopeTurnIdMismatch,
665 format!(
666 "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
667 scoped_effect_controller.scope_id()
668 ),
669 ));
670 }
671 let follow_protocol_turn_options = input.protocol_turn_options.clone();
672 let follow_turn_context = input.turn_context.clone();
673 let follow_trace_turn_id = input
674 .trace_turn_id
675 .clone()
676 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
677 input
678 .trace_turn_id
679 .get_or_insert(follow_trace_turn_id.clone());
680 let mut turns = Vec::new();
681 loop {
682 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
683 input.trace_turn_id = Some(turn_trace_turn_id.clone());
684 let turn_effect_controller = if turns.is_empty() {
685 scoped_effect_controller.clone()
686 } else {
687 ScopedEffectController::borrowed(
688 scoped_effect_controller.controller(),
689 EffectScope::turn(&self.state.session_id, &turn_trace_turn_id),
690 )?
691 };
692 let turn = self
693 .stream_turn_with_scoped_effect_controller_inner(
694 input,
695 events,
696 turn_events,
697 turn_effect_controller,
698 cancel.clone(),
699 None,
700 )
701 .await?;
702 let switched_frame = match &turn.outcome {
703 TurnOutcome::AgentFrameSwitch { frame_id, task } => {
704 Some((frame_id.clone(), task.clone()))
705 }
706 _ => None,
707 };
708 turns.push(turn);
709
710 let Some((_frame_id, task)) = switched_frame else {
711 return Ok(AgentFrameRun { turns });
712 };
713 input = turn_input_from_text(task);
714 input.protocol_turn_options = follow_protocol_turn_options.clone();
715 input.turn_context = follow_turn_context.clone();
716 }
717 }
718
719 async fn stream_turn_inner(
720 &mut self,
721 mut input: TurnInput,
722 events: &dyn EventSink,
723 turn_events: &dyn TurnActivitySink,
724 scoped_effect_controller: ScopedEffectController<'_>,
725 cancel: CancellationToken,
726 queued_claim: Option<crate::QueuedWorkClaim>,
727 ) -> Result<AssembledTurn, RuntimeError> {
728 self.refresh_session_graph_from_store()
729 .await
730 .map_err(session_head_refresh_error)?;
731 let input_trace_turn_id = input.trace_turn_id.clone();
732 let queued_turn_work = queued_claim
733 .as_ref()
734 .map(crate::QueuedWorkClaim::materialize_for_turn);
735 if let Some(work) = queued_turn_work.as_ref()
736 && input.items.is_empty()
737 && input.image_blobs.is_empty()
738 {
739 input = work.input.clone();
740 if input.trace_turn_id.is_none() {
741 input.trace_turn_id = input_trace_turn_id;
742 }
743 }
744 if self
745 .session
746 .as_ref()
747 .and_then(|session| session.history_store())
748 .is_some()
749 {
750 ensure_durable_effect_input(&input)?;
751 }
752 if let Some(extension) = &input.protocol_extension
753 && let Some(session) = self.session.as_ref()
754 {
755 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
756 protocol_session
757 .validate_turn_extension(extension)
758 .await
759 .map_err(|err| {
760 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
761 })?;
762 }
763 let previous_prompt_usage = self.state.last_prompt_usage.clone();
764 let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
765 Ok(items) => items,
766 Err(e) => {
767 self.state.last_prompt_usage = None;
768 let mut assembler = TurnAssembler::default();
769 let error_event = SessionEvent::Error {
770 message: e.clone(),
771 envelope: Some(crate::session_model::ErrorEnvelope {
772 kind: "input_validation".to_string(),
773 code: Some("invalid_turn_input".to_string()),
774 terminal_reason: None,
775 user_message: e.clone(),
776 raw: None,
777 }),
778 };
779 assembler.push(&error_event);
780 emit_turn_activity_to_sink(
781 turn_events,
782 TurnActivity::independent(TurnEvent::Error { message: e }),
783 )
784 .await;
785 emit_session_event_to_sink(events, error_event).await;
786 let outcome_event = SessionEvent::TurnOutcome {
787 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
788 };
789 assembler.push(&outcome_event);
790 emit_session_event_to_sink(events, outcome_event).await;
791 assembler.push(&SessionEvent::Done);
792 emit_session_event_to_sink(events, SessionEvent::Done).await;
793 return Ok(assembler.finish(
794 self.state.to_snapshot(),
795 false,
796 None,
797 &self.host.core.control.termination,
798 ));
799 }
800 };
801 let turn_index = self.state.turn_index + 1;
802 let trace_turn_id = input
803 .trace_turn_id
804 .clone()
805 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
806 if self.host.core.tracing.trace_sink.is_some() {
807 let mut trace_metadata = std::collections::BTreeMap::new();
808 trace_metadata.insert(
809 "input_item_count".to_string(),
810 serde_json::json!(normalized.len()),
811 );
812 crate::trace::emit_trace(
813 &self.host.core.tracing.trace_sink,
814 &self.host.core.tracing.trace_context,
815 lash_trace::TraceContext::default()
816 .for_session(self.state.session_id.clone())
817 .for_turn_index(turn_index)
818 .for_turn(trace_turn_id.clone()),
819 lash_trace::TraceEvent::TurnStarted {
820 metadata: trace_metadata,
821 },
822 );
823 }
824
825 let base_read_model = self.state.read_model();
826 let base_messages = base_read_model.messages;
827 let base_render_cache = base_read_model.prompt_render_cache;
828 let mut turn_delta = Vec::new();
829 let initial_turn_causes = queued_turn_work
830 .as_ref()
831 .map(|work| work.turn_causes.clone())
832 .unwrap_or_default();
833 turn_delta.extend(
834 initial_turn_causes
835 .iter()
836 .map(crate::TurnCause::to_event_message),
837 );
838
839 let user_id = fresh_message_id();
840 let mut user_parts: Vec<Part> = Vec::new();
841 for item in normalized {
842 match item {
843 NormalizedItem::Text(text) => {
844 if text.is_empty() {
845 continue;
846 }
847 user_parts.push(Part {
848 id: format!("{}.p{}", user_id, user_parts.len()),
849 kind: PartKind::Text,
850 content: text,
851 attachment: None,
852 tool_call_id: None,
853 tool_name: None,
854 tool_replay: None,
855 prune_state: PruneState::Intact,
856 reasoning_meta: None,
857 response_meta: None,
858 });
859 }
860 NormalizedItem::Image(reference) => {
861 user_parts.push(Part {
862 id: format!("{}.p{}", user_id, user_parts.len()),
863 kind: PartKind::Image,
864 content: String::new(),
865 attachment: Some(crate::session_model::message::PartAttachment {
866 reference,
867 }),
868 tool_call_id: None,
869 tool_name: None,
870 tool_replay: None,
871 prune_state: PruneState::Intact,
872 reasoning_meta: None,
873 response_meta: None,
874 });
875 }
876 }
877 }
878 if user_parts.is_empty() && initial_turn_causes.is_empty() {
879 user_parts.push(Part {
880 id: format!("{}.p0", user_id),
881 kind: PartKind::Text,
882 content: String::new(),
883 attachment: None,
884 tool_call_id: None,
885 tool_name: None,
886 tool_replay: None,
887 prune_state: PruneState::Intact,
888 reasoning_meta: None,
889 response_meta: None,
890 });
891 }
892 if !user_parts.is_empty() {
893 reassign_part_ids(&user_id, &mut user_parts);
894 turn_delta.push(Message {
895 id: user_id.clone(),
896 role: MessageRole::User,
897 parts: shared_parts(user_parts),
898 origin: None,
899 });
900 }
901
902 let manager = self
903 .runtime_session_services_for_turn(None)
904 .map_err(|err| {
905 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
906 })?;
907 let plugin_session = self
908 .session
909 .as_ref()
910 .map(|s| Arc::clone(s.plugins()))
911 .ok_or_else(|| {
912 RuntimeError::new(
913 RuntimeErrorCode::ContextPrepareTurn,
914 "runtime session not available",
915 )
916 })?;
917 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
918 let prepare_phase_controller = scoped_child_turn_controller(
919 &scoped_effect_controller,
920 &self.state.session_id,
921 &prepare_phase_turn_id,
922 )?;
923 let turn_ctx = crate::TurnTransformContext {
924 session_id: self.state.session_id.clone(),
925 state: self.read_view(),
926 prompt_usage: previous_prompt_usage.clone(),
927 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
928 sessions: manager.state_service(),
929 session_lifecycle: manager.lifecycle_service(),
930 session_graph: manager.graph_service(),
931 scoped_effect_controller: scoped_effect_controller.clone(),
932 direct_completions: manager.direct_completion_client(
933 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
934 Some(prepare_phase_turn_id),
935 ),
936 };
937 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
938 let prepared_context = plugin_session
939 .prepare_turn_context(
940 &turn_ctx,
941 crate::session_model::context::PreparedContext {
942 messages: crate::MessageSequence::from_base_and_delta(
943 base_messages,
944 turn_delta,
945 )
946 .with_base_render_cache(base_render_cache),
947 ..Default::default()
948 },
949 self.turn_phase_probe.clone(),
950 )
951 .await
952 .map_err(|err| {
953 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
954 })?;
955 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
956 drop(turn_ctx);
961 let messages = prepared_context.messages;
962 if let Some(session) = self.session.as_mut() {
963 session
964 .set_context_surface(
965 prepared_context.tool_providers,
966 prepared_context.prompt_contributions,
967 prepared_context.include_base_tools,
968 )
969 .map_err(|err| {
970 RuntimeError::new(
971 RuntimeErrorCode::Other("session_tool_registry".to_string()),
972 err.to_string(),
973 )
974 })?;
975 }
976
977 self.state.last_prompt_usage = None;
978
979 self.stream_prepared_turn(
980 messages,
981 previous_prompt_usage,
982 input.protocol_turn_options.clone(),
983 input.protocol_extension.clone(),
984 input.turn_context.clone(),
985 initial_turn_causes,
986 trace_turn_id,
987 turn_index,
988 events,
989 turn_events,
990 scoped_effect_controller,
991 cancel,
992 queued_claim,
993 )
994 .await
995 }
996
997 pub async fn run_turn_assembled(
999 &mut self,
1000 input: TurnInput,
1001 cancel: CancellationToken,
1002 scoped_effect_controller: ScopedEffectController<'_>,
1003 ) -> Result<AssembledTurn, RuntimeError> {
1004 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1005 .await
1006 }
1007
1008 #[allow(clippy::too_many_arguments)]
1010 pub async fn stream_prepared_turn(
1011 &mut self,
1012 messages: crate::MessageSequence,
1013 _previous_prompt_usage: Option<PromptUsage>,
1014 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1015 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1016 turn_context: crate::TurnContext,
1017 initial_turn_causes: Vec<crate::TurnCause>,
1018 trace_turn_id: String,
1019 turn_index: usize,
1020 events: &dyn EventSink,
1021 turn_events: &dyn TurnActivitySink,
1022 scoped_effect_controller: ScopedEffectController<'_>,
1023 cancel: CancellationToken,
1024 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1025 ) -> Result<AssembledTurn, RuntimeError> {
1026 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1027 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1028 let mut turn_policy = self.state.effective_policy().clone();
1029 let turn_provider_override = turn_context.provider().cloned();
1030 if let Some(provider) = turn_provider_override.as_ref() {
1031 turn_policy.provider_id = provider.kind().to_string();
1032 }
1033 if let Some(model) = turn_context.model_spec() {
1034 turn_policy.model = model.clone();
1035 }
1036 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1037 let effective_protocol_turn_options = protocol_turn_options
1038 .clone()
1039 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1040 .unwrap_or(session_protocol_turn_options);
1041 let manager = self
1042 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1043 .map_err(|err| {
1044 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1045 })?;
1046 let plugins = {
1047 let session = self
1048 .session
1049 .as_ref()
1050 .expect("lash runtime session must be available");
1051 Arc::clone(session.plugins())
1052 };
1053 let mut assembler = TurnAssembler::new();
1054 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1055 let prepared = {
1061 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1062 PrepareTurnRequest {
1063 session_id: self.state.session_id.clone(),
1064 state: crate::SessionReadView::from_runtime_state(
1065 &self.state,
1066 turn_policy.clone(),
1067 effective_protocol_turn_options.clone(),
1068 ),
1069 messages,
1070 sessions: manager.state_service(),
1071 session_lifecycle: manager.lifecycle_service(),
1072 session_graph: manager.graph_service(),
1073 turn_context: turn_context.clone(),
1074 },
1075 self.turn_phase_probe.clone(),
1076 );
1077 tokio::pin!(prepare_turn);
1078
1079 loop {
1080 tokio::select! {
1081 prepared = &mut prepare_turn => {
1082 let prepared = prepared.map_err(|err| {
1083 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1084 })?;
1085 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1086 break prepared;
1087 }
1088 maybe_event = event_rx.recv() => {
1089 if let Some(event) = maybe_event {
1090 emit_runtime_stream_event_to_sinks(
1091 events,
1092 turn_events,
1093 event,
1094 &mut assembler,
1095 )
1096 .await;
1097 }
1098 }
1099 }
1100 }
1101 };
1102 for event in &prepared.events {
1103 assembler.push(event);
1104 }
1105 emit_session_events_to_sink(events, prepared.events).await;
1106 if let Some(abort) = prepared.abort {
1107 drop(event_tx);
1108
1109 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1110 turn_pipeline.apply_prepared_messages(&prepared.messages);
1111 let state = turn_pipeline.into_final_state();
1112 let issue = TurnIssue {
1113 kind: "plugin".to_string(),
1114 code: Some(abort.code),
1115 terminal_reason: None,
1116 message: abort.message.clone(),
1117 raw: None,
1118 };
1119 let error_event = SessionEvent::Error {
1120 message: abort.message,
1121 envelope: Some(crate::session_model::ErrorEnvelope {
1122 kind: "plugin".to_string(),
1123 code: issue.code.clone(),
1124 terminal_reason: None,
1125 user_message: issue.message.clone(),
1126 raw: None,
1127 }),
1128 };
1129 assembler.push(&error_event);
1130 emit_turn_activity_to_sink(
1131 turn_events,
1132 TurnActivity::independent(TurnEvent::Error {
1133 message: issue.message.clone(),
1134 }),
1135 )
1136 .await;
1137 emit_session_event_to_sink(events, error_event).await;
1138 let outcome_event = SessionEvent::TurnOutcome {
1139 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1140 };
1141 assembler.push(&outcome_event);
1142 emit_session_event_to_sink(events, outcome_event).await;
1143 assembler.push(&SessionEvent::Done);
1144 emit_session_event_to_sink(events, SessionEvent::Done).await;
1145 return Ok(assembler.finish(
1146 state.to_snapshot(),
1147 cancel.is_cancelled(),
1148 Some(issue),
1149 &self.host.core.control.termination,
1150 ));
1151 }
1152 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1153 let store = self
1154 .session
1155 .as_ref()
1156 .and_then(|session| session.history_store());
1157 turn_pipeline
1158 .prepared_checkpoint(
1159 store.as_ref().map(|store| store.as_ref()),
1160 turn_policy.clone(),
1161 turn_index,
1162 &prepared.messages,
1163 self.session.as_mut(),
1164 )
1165 .await
1166 .map_err(|err| {
1167 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1168 })?;
1169 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1170 RuntimeSessionPolicy::from_provider(turn_policy.clone(), provider)
1171 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1172 } else {
1173 self.host
1174 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1175 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1176 };
1177 let manager = self
1178 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1179 .map_err(|err| {
1180 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1181 })?;
1182 let cancel_state = cancel.clone();
1183 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1184 let session = self
1185 .session
1186 .take()
1187 .expect("lash runtime session must be available");
1188 let mut driver = RuntimeTurnDriver {
1189 session,
1190 policy: resolved_turn_policy,
1191 host: self.host.clone(),
1192 turn_id: scoped_effect_controller.scope_id().to_string(),
1193 scoped_effect_controller,
1194 session_id: self.state.session_id.clone(),
1195 turn_index,
1196 turn_pipeline,
1197 llm_stream_summaries: HashMap::new(),
1198 next_llm_ordinal: 0,
1199 session_services: manager,
1200 protocol_turn_options: effective_protocol_turn_options,
1201 protocol_extension,
1202 turn_context,
1203 turn_causes: initial_turn_causes,
1204 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1205 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1206 turn_phase_probe: self.turn_phase_probe.clone(),
1207 };
1208 let protocol_run_offset = 0;
1209 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1210 let run_result = drive_turn_to_completion(
1211 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1212 &mut event_rx,
1213 &mut assembler,
1214 &child_usage_event_relay,
1215 events,
1216 turn_events,
1217 )
1218 .await;
1219 let (new_messages, _new_protocol_iteration) = match run_result {
1220 Ok(result) => result,
1221 Err(err) => {
1222 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1223 let RuntimeTurnDriver { session, .. } = driver;
1224 self.session = Some(session);
1225 return Err(err);
1226 }
1227 };
1228 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1229 tracing::debug!(
1230 new_message_count = new_messages.len(),
1231 tool_call_count = assembler.tool_calls.len(),
1232 "runtime post-run_task"
1233 );
1234
1235 let RuntimeTurnDriver {
1236 session,
1237 policy,
1238 turn_pipeline,
1239 pending_queue_claims,
1240 ..
1241 } = driver;
1242 self.session = Some(session);
1243 self.finish_turn(
1244 TurnFinishInput {
1245 turn_pipeline,
1246 assembler,
1247 new_messages,
1248 policy,
1249 turn_index,
1250 queued_work_completions: pending_queue_claims
1251 .iter()
1252 .map(crate::QueuedWorkClaim::completion)
1253 .collect(),
1254 trace_turn_id,
1255 },
1256 events,
1257 &finish_scoped_effect_controller,
1258 &cancel_state,
1259 )
1260 .await
1261 }
1262 fn normalize_input_items(
1263 &self,
1264 items: &[InputItem],
1265 image_blobs: &HashMap<String, Vec<u8>>,
1266 ) -> Result<Vec<NormalizedItem>, String> {
1267 normalize_input_items(
1268 items,
1269 image_blobs,
1270 self.host.core.durability.attachment_store.as_ref(),
1271 )
1272 }
1273}
1274
1275fn turn_input_from_text(text: String) -> TurnInput {
1276 TurnInput {
1277 items: vec![InputItem::Text { text }],
1278 image_blobs: HashMap::new(),
1279 protocol_turn_options: None,
1280 trace_turn_id: None,
1281 protocol_extension: None,
1282 turn_context: crate::TurnContext::default(),
1283 }
1284}
1285
1286fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1287 if completed_turn_count == 0 {
1288 root_turn_id.to_string()
1289 } else {
1290 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1291 }
1292}
1293
1294pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1295 if input.protocol_extension.is_some() {
1296 return Err(RuntimeError::new(
1297 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1298 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1299 ));
1300 }
1301 input
1302 .turn_context
1303 .live_plugin_inputs()
1304 .durable_effect_rejection()?;
1305 Ok(())
1306}
1307
1308async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1309 if !events.is_noop() {
1310 events.emit(activity).await;
1311 }
1312}
1313
1314async fn drive_turn_to_completion<F>(
1324 run_future: F,
1325 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1326 assembler: &mut TurnAssembler,
1327 child_usage_event_relay: &ChildUsageEventRelay,
1328 events: &dyn EventSink,
1329 turn_events: &dyn TurnActivitySink,
1330) -> Result<(crate::MessageSequence, usize), RuntimeError>
1331where
1332 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1333{
1334 let run_result = {
1335 tokio::pin!(run_future);
1336 loop {
1337 tokio::select! {
1338 maybe_event = event_rx.recv() => {
1339 if let Some(event) = maybe_event {
1340 emit_runtime_stream_event_to_sinks(
1341 events,
1342 turn_events,
1343 event,
1344 assembler,
1345 )
1346 .await;
1347 }
1348 }
1349 completed = &mut run_future => {
1350 child_usage_event_relay.clear();
1351 break completed;
1352 }
1353 }
1354 }
1355 };
1356 while let Some(event) = event_rx.recv().await {
1357 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1358 }
1359 run_result
1360}
1361
1362async fn emit_runtime_stream_event_to_sinks(
1363 events: &dyn EventSink,
1364 turn_events: &dyn TurnActivitySink,
1365 event: RuntimeStreamEvent,
1366 assembler: &mut TurnAssembler,
1367) {
1368 match event {
1369 RuntimeStreamEvent::Session(event) => {
1370 assembler.push(&event);
1371 emit_session_event_to_sink(events, event).await;
1372 }
1373 RuntimeStreamEvent::Turn(activity) => {
1374 assembler.push_turn_activity(&activity);
1375 emit_turn_activity_to_sink(turn_events, activity).await;
1376 }
1377 }
1378}
1379
1380#[cfg(test)]
1381mod tests {
1382 use super::agent_frame_follow_turn_id;
1383
1384 #[test]
1385 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1386 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1387 assert_eq!(
1388 agent_frame_follow_turn_id("root-turn", 1),
1389 "root-turn:agent-frame:1"
1390 );
1391 assert_eq!(
1392 agent_frame_follow_turn_id("root-turn", 2),
1393 "root-turn:agent-frame:2"
1394 );
1395 }
1396}