1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52 match payload {
53 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
56 }
57}
58
59fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
60 claim
61 .batches
62 .iter()
63 .map(|batch| batch.batch_id.clone())
64 .collect()
65}
66
67fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
68 format!("{parent_turn_id}:{phase}")
69}
70
71fn scoped_child_turn_controller<'run>(
72 scoped_effect_controller: &'run ScopedEffectController<'_>,
73 session_id: &str,
74 turn_id: &str,
75) -> Result<ScopedEffectController<'run>, RuntimeError> {
76 ScopedEffectController::borrowed(
77 scoped_effect_controller.controller(),
78 EffectScope::turn(session_id, turn_id),
79 )
80}
81
82pub(in crate::runtime) fn queued_work_trace_payload(
83 boundary: crate::QueuedWorkClaimBoundary,
84 claim: &crate::QueuedWorkClaim,
85 causes: &[crate::TurnCause],
86) -> serde_json::Value {
87 serde_json::json!({
88 "boundary": boundary,
89 "claim_id": claim.claim_id,
90 "owner_id": claim.owner_id,
91 "batch_ids": queued_work_batch_ids(claim),
92 "payload_types": claim.batches.iter()
93 .flat_map(|batch| batch.items.iter())
94 .map(|item| queued_work_payload_type(&item.payload))
95 .collect::<Vec<_>>(),
96 "causes": causes,
97 })
98}
99
100pub(in crate::runtime) fn queued_work_completion_trace_payload(
101 completions: &[crate::QueuedWorkCompletion],
102) -> serde_json::Value {
103 serde_json::json!({
104 "claims": completions.iter().map(|completion| {
105 serde_json::json!({
106 "session_id": completion.session_id,
107 "claim_id": completion.claim_id,
108 "batch_ids": completion.batch_ids,
109 })
110 }).collect::<Vec<_>>(),
111 })
112}
113
114async fn emit_queued_work_started_to_sink(
115 events: &dyn TurnActivitySink,
116 boundary: crate::QueuedWorkClaimBoundary,
117 claim: &crate::QueuedWorkClaim,
118 causes: Vec<crate::TurnCause>,
119) {
120 emit_turn_activity_to_sink(
121 events,
122 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
123 boundary,
124 batch_ids: queued_work_batch_ids(claim),
125 causes,
126 }),
127 )
128 .await;
129}
130
131pub(in crate::runtime) async fn send_queued_work_started_event(
132 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
133 boundary: crate::QueuedWorkClaimBoundary,
134 claim: &crate::QueuedWorkClaim,
135 causes: Vec<crate::TurnCause>,
136) {
137 send_turn_activity(
138 event_tx,
139 TurnActivityId::fresh(),
140 TurnEvent::QueuedWorkStarted {
141 boundary,
142 batch_ids: queued_work_batch_ids(claim),
143 causes,
144 },
145 )
146 .await;
147}
148
149struct TurnFinishInput {
150 turn_pipeline: TurnBoundary,
151 assembler: TurnAssembler,
152 new_messages: crate::MessageSequence,
153 policy: RuntimeSessionPolicy,
154 turn_index: usize,
155 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
156 trace_turn_id: String,
157}
158
159impl LashRuntime {
160 fn max_context_tokens(&self) -> usize {
161 self.state.effective_policy().context_window_tokens()
162 }
163 #[doc(hidden)]
164 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
165 self.turn_phase_probe = Some(probe);
166 }
167
168 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
169 if let Some(probe) = self.turn_phase_probe.as_ref() {
170 probe.begin(phase);
171 }
172 }
173
174 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
175 if let Some(probe) = self.turn_phase_probe.as_ref() {
176 probe.end(phase);
177 }
178 }
179
180 async fn finish_turn(
181 &mut self,
182 finish: TurnFinishInput,
183 events: &dyn EventSink,
184 scoped_effect_controller: &ScopedEffectController<'_>,
185 cancel_state: &CancellationToken,
186 ) -> Result<AssembledTurn, RuntimeError> {
187 let TurnFinishInput {
188 mut turn_pipeline,
189 assembler,
190 new_messages,
191 policy,
192 turn_index,
193 queued_work_completions,
194 trace_turn_id,
195 } = finish;
196 self.policy = self.state.effective_policy().clone();
197 turn_pipeline.state_mut().policy = self.policy.clone();
198 turn_pipeline.state_mut().turn_index = turn_index;
199
200 let mut turn_usage_delta = {
201 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
202 std::mem::take(&mut *ledger)
203 };
204 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
205 turn_usage_delta.push(TokenLedgerEntry {
206 source: "turn".to_string(),
207 model: policy.model.id.clone(),
208 usage: assembler.token_usage.clone(),
209 });
210 }
211 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
212
213 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
214 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
215 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
216 }
217
218 let last_prompt_usage = assembler
219 .last_llm_usage()
220 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
221 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
222 let assembled_state = turn_pipeline.export_state_for_assembly();
223 let assembled = assembler.finish(
224 assembled_state,
225 cancel_state.is_cancelled(),
226 None,
227 &self.host.core.control.termination,
228 );
229
230 let Some(session) = self.session.as_ref() else {
231 self.state.apply_snapshot(&assembled.state);
232 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
233 return Ok(assembled);
234 };
235
236 let plugins = Arc::clone(session.plugins());
237 let manager = match self.runtime_session_services_for_turn(None) {
238 Ok(manager) => manager,
239 Err(err) => {
240 return Err(RuntimeError::new(
241 RuntimeErrorCode::PluginSessionManager,
242 err.to_string(),
243 ));
244 }
245 };
246
247 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
248 let finalized = match plugins
249 .finalize_turn_with_phase_probe(
250 assembled,
251 manager.state_service(),
252 manager.lifecycle_service(),
253 manager.graph_service(),
254 self.turn_phase_probe.clone(),
255 )
256 .await
257 {
258 Ok(finalized) => finalized,
259 Err(err) => {
260 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
261 return Err(RuntimeError::new(
262 RuntimeErrorCode::PluginFinalizeTurn,
263 err.to_string(),
264 ));
265 }
266 };
267 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
268
269 let mut returned_turn = finalized.turn;
270 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
271 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
272 let queued_work_completion_trace = queued_work_completions.clone();
273 if let Err(err) = turn_pipeline
274 .final_commit(
275 &mut returned_turn,
276 self.session.as_mut(),
277 &turn_usage_delta,
278 Some(&trace_turn_id),
279 queued_work_completions,
280 )
281 .await
282 {
283 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
284 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
285 return Err(err);
286 }
287 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
288
289 emit_session_events_to_sink(events, finalized.events).await;
290 self.state = turn_pipeline.into_final_state();
291 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
292 && let Some(session) = self.session.as_mut()
293 {
294 let protocol_session = Arc::clone(session.plugins().protocol_session());
295 let session_id = self.state.session_id.clone();
296 protocol_session
297 .restore_session(
298 crate::plugin::ProtocolSessionContext::new(session, &session_id),
299 &self.state,
300 )
301 .await
302 .map_err(|err| {
303 RuntimeError::new(
304 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
305 err.to_string(),
306 )
307 })?;
308 }
309 if !queued_work_completion_trace.is_empty() {
310 crate::trace::emit_trace(
311 &self.host.core.tracing.trace_sink,
312 &self.host.core.tracing.trace_context,
313 lash_trace::TraceContext::default()
314 .for_session(returned_turn.state.session_id.clone())
315 .for_turn_index(returned_turn.state.turn_index)
316 .for_turn(trace_turn_id.clone()),
317 lash_trace::TraceEvent::Custom {
318 name: "queued_work.completed".to_string(),
319 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
320 },
321 );
322 }
323 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
324 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
325 .await?;
326 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
327 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
328
329 self.emit_completed_turn_trace(
330 &returned_turn.state,
331 &returned_turn.outcome,
332 &trace_turn_id,
333 );
334 Ok(returned_turn)
335 }
336
337 fn emit_completed_turn_trace(
338 &self,
339 state: &SessionSnapshot,
340 outcome: &TurnOutcome,
341 trace_turn_id: &str,
342 ) {
343 if self.host.core.tracing.trace_sink.is_none() {
344 return;
345 }
346
347 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
348 crate::trace::emit_trace(
349 &self.host.core.tracing.trace_sink,
350 &self.host.core.tracing.trace_context,
351 lash_trace::TraceContext::default()
352 .for_session(state.session_id.clone())
353 .for_turn_index(state.turn_index)
354 .for_turn(trace_turn_id.to_string()),
355 lash_trace::TraceEvent::TurnCompleted {
356 status: status.to_string(),
357 done_reason: done_reason.to_string(),
358 agent_frame_switch,
359 },
360 );
361 }
362
363 async fn emit_turn_persisted_event(
364 &self,
365 returned_turn: &AssembledTurn,
366 scoped_effect_controller: &ScopedEffectController<'_>,
367 trace_turn_id: &str,
368 ) -> Result<(), RuntimeError> {
369 let Some(session) = self.session.as_ref() else {
370 return Ok(());
371 };
372 let Ok(manager) = self.runtime_session_services() else {
373 return Ok(());
374 };
375 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
376 let phase_controller = scoped_child_turn_controller(
377 scoped_effect_controller,
378 &self.state.session_id,
379 &phase_turn_id,
380 )?;
381 let direct_completions = manager.direct_completion_client(
382 RuntimeEffectControllerHandle::borrowed(phase_controller),
383 Some(phase_turn_id),
384 );
385
386 session
387 .plugins()
388 .emit_runtime_event_with_phase_probe(
389 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
390 crate::SessionStateChangedContext {
391 session_id: self.state.session_id.clone(),
392 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
393 sessions: manager.state_service(),
394 session_graph: manager.graph_service(),
395 direct_completions,
396 },
397 )),
398 self.turn_phase_probe.clone(),
399 )
400 .await;
401 Ok(())
402 }
403
404 pub async fn stream_turn(
406 &mut self,
407 input: TurnInput,
408 opts: TurnOptions<'_>,
409 ) -> Result<AssembledTurn, RuntimeError> {
410 let cancel = opts.cancel.clone();
411 let scoped_effect_controller = opts.scoped_effect_controller();
412 self.stream_turn_with_scoped_effect_controller_inner(
413 input,
414 opts.events_or_noop(),
415 opts.turn_events_or_noop(),
416 scoped_effect_controller,
417 cancel,
418 None,
419 )
420 .await
421 }
422
423 pub async fn stream_next_queued_work(
424 &mut self,
425 opts: TurnOptions<'_>,
426 ) -> Result<Option<AssembledTurn>, RuntimeError> {
427 self.stream_queued_work(opts, None).await
428 }
429
430 pub async fn stream_selected_queued_work(
431 &mut self,
432 opts: TurnOptions<'_>,
433 batch_ids: &[String],
434 ) -> Result<Option<AssembledTurn>, RuntimeError> {
435 self.stream_queued_work(opts, Some(batch_ids)).await
436 }
437
438 async fn stream_queued_work(
439 &mut self,
440 opts: TurnOptions<'_>,
441 selected_batch_ids: Option<&[String]>,
442 ) -> Result<Option<AssembledTurn>, RuntimeError> {
443 if self.drain_next_session_command().await?.is_some() {
444 return Ok(None);
445 }
446 let Some(store) = self
447 .session
448 .as_ref()
449 .and_then(|session| session.history_store())
450 else {
451 return Ok(None);
452 };
453 let claim = if let Some(batch_ids) = selected_batch_ids {
454 store
455 .claim_ready_queued_work_by_batch_ids(
456 &self.state.session_id,
457 &self.runtime_scope_id,
458 crate::QueuedWorkClaimBoundary::Idle,
459 crate::QUEUED_WORK_CLAIM_TTL_MS,
460 batch_ids,
461 )
462 .await
463 } else {
464 store
465 .claim_ready_queued_work(
466 &self.state.session_id,
467 &self.runtime_scope_id,
468 crate::QueuedWorkClaimBoundary::Idle,
469 crate::QUEUED_WORK_CLAIM_TTL_MS,
470 64,
471 )
472 .await
473 }
474 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
475 let Some(claim) = claim else {
476 return Ok(None);
477 };
478 let mut work = claim.materialize_for_turn();
479 let turn_id = work
480 .input
481 .trace_turn_id
482 .clone()
483 .or_else(|| Some(opts.effect_scope_id().to_owned()))
484 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
485 work.input.trace_turn_id = Some(turn_id.clone());
486 let causes = work.turn_causes.clone();
487 emit_queued_work_started_to_sink(
488 opts.turn_events_or_noop(),
489 crate::QueuedWorkClaimBoundary::Idle,
490 &claim,
491 causes.clone(),
492 )
493 .await;
494 crate::trace::emit_trace(
495 &self.host.core.tracing.trace_sink,
496 &self.host.core.tracing.trace_context,
497 lash_trace::TraceContext::default()
498 .for_session(self.state.session_id.clone())
499 .for_turn_index(self.state.turn_index + 1)
500 .for_turn(turn_id.clone()),
501 lash_trace::TraceEvent::Custom {
502 name: "queued_work.claimed".to_string(),
503 payload: queued_work_trace_payload(
504 crate::QueuedWorkClaimBoundary::Idle,
505 &claim,
506 &causes,
507 ),
508 },
509 );
510 let cancel = opts.cancel.clone();
511 let scoped_effect_controller = opts.scoped_effect_controller();
512 self.stream_turn_with_scoped_effect_controller_inner(
513 work.input,
514 opts.events_or_noop(),
515 opts.turn_events_or_noop(),
516 scoped_effect_controller,
517 cancel,
518 Some(claim),
519 )
520 .await
521 .map(Some)
522 }
523
524 fn ensure_durable_store_facets_for_scope(
532 &self,
533 scoped_effect_controller: &ScopedEffectController<'_>,
534 ) -> Result<(), RuntimeError> {
535 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
536 {
537 return Ok(());
538 }
539 if self
540 .host
541 .core
542 .durability
543 .attachment_store
544 .persistence()
545 .durability_tier()
546 != crate::DurabilityTier::Durable
547 {
548 return Err(RuntimeError::durable_store_required(
549 crate::DurableStoreFacet::AttachmentStore,
550 ));
551 }
552 if self
553 .host
554 .core
555 .durability
556 .lashlang_artifact_store
557 .durability_tier()
558 != crate::DurabilityTier::Durable
559 {
560 return Err(RuntimeError::durable_store_required(
561 crate::DurableStoreFacet::ArtifactStore,
562 ));
563 }
564 if let Some(store) = self
565 .session
566 .as_ref()
567 .and_then(|session| session.history_store())
568 && store.durability_tier() != crate::DurabilityTier::Durable
569 {
570 return Err(RuntimeError::durable_store_required(
571 crate::DurableStoreFacet::SessionStore,
572 ));
573 }
574 if let Some(process_registry) = self.host.process_registry.as_ref()
575 && process_registry.durability_tier() != crate::DurabilityTier::Durable
576 {
577 return Err(RuntimeError::durable_store_required(
578 crate::DurableStoreFacet::ProcessRegistry,
579 ));
580 }
581 Ok(())
582 }
583
584 async fn stream_turn_with_scoped_effect_controller_inner(
585 &mut self,
586 mut input: TurnInput,
587 events: &dyn EventSink,
588 turn_events: &dyn TurnActivitySink,
589 scoped_effect_controller: ScopedEffectController<'_>,
590 cancel: CancellationToken,
591 queued_claim: Option<crate::QueuedWorkClaim>,
592 ) -> Result<AssembledTurn, RuntimeError> {
593 if queued_claim.is_none() {
594 while self.drain_next_session_command().await?.is_some() {}
595 }
596 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
597 && scoped_effect_controller
598 .effect_scope()
599 .validates_turn_trace_id()
600 && input_turn_id != scoped_effect_controller.scope_id()
601 {
602 return Err(RuntimeError::new(
603 RuntimeErrorCode::EffectScopeTurnIdMismatch,
604 format!(
605 "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
606 scoped_effect_controller.scope_id()
607 ),
608 ));
609 }
610 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
611 input
612 .trace_turn_id
613 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
614 self.stream_turn_inner(
615 input.clone(),
616 events,
617 turn_events,
618 scoped_effect_controller,
619 cancel.clone(),
620 queued_claim,
621 )
622 .await
623 }
624
625 pub async fn stream_turn_with_agent_frames(
633 &mut self,
634 input: TurnInput,
635 opts: TurnOptions<'_>,
636 ) -> Result<AgentFrameRun, RuntimeError> {
637 let cancel = opts.cancel.clone();
638 let scoped_effect_controller = opts.scoped_effect_controller();
639 self.stream_turn_with_agent_frames_inner(
640 input,
641 opts.events_or_noop(),
642 opts.turn_events_or_noop(),
643 scoped_effect_controller,
644 cancel,
645 )
646 .await
647 }
648
649 async fn stream_turn_with_agent_frames_inner(
650 &mut self,
651 mut input: TurnInput,
652 events: &dyn EventSink,
653 turn_events: &dyn TurnActivitySink,
654 scoped_effect_controller: ScopedEffectController<'_>,
655 cancel: CancellationToken,
656 ) -> Result<AgentFrameRun, RuntimeError> {
657 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
658 && scoped_effect_controller
659 .effect_scope()
660 .validates_turn_trace_id()
661 && input_turn_id != scoped_effect_controller.scope_id()
662 {
663 return Err(RuntimeError::new(
664 RuntimeErrorCode::EffectScopeTurnIdMismatch,
665 format!(
666 "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
667 scoped_effect_controller.scope_id()
668 ),
669 ));
670 }
671 let follow_protocol_turn_options = input.protocol_turn_options.clone();
672 let follow_turn_context = input.turn_context.clone();
673 let follow_trace_turn_id = input
674 .trace_turn_id
675 .clone()
676 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
677 input
678 .trace_turn_id
679 .get_or_insert(follow_trace_turn_id.clone());
680 let mut turns = Vec::new();
681 loop {
682 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
683 input.trace_turn_id = Some(turn_trace_turn_id.clone());
684 let turn_effect_controller = if turns.is_empty() {
685 scoped_effect_controller.clone()
686 } else {
687 ScopedEffectController::borrowed(
688 scoped_effect_controller.controller(),
689 EffectScope::turn(&self.state.session_id, &turn_trace_turn_id),
690 )?
691 };
692 let turn = self
693 .stream_turn_with_scoped_effect_controller_inner(
694 input,
695 events,
696 turn_events,
697 turn_effect_controller,
698 cancel.clone(),
699 None,
700 )
701 .await?;
702 let switched_frame_id = match &turn.outcome {
703 TurnOutcome::AgentFrameSwitch { frame_id } => Some(frame_id.clone()),
704 _ => None,
705 };
706 let next_task = switched_frame_id
707 .as_ref()
708 .and_then(|frame_id| frame_switch_task(&turn, frame_id));
709 turns.push(turn);
710
711 let Some(_frame_id) = switched_frame_id else {
712 return Ok(AgentFrameRun { turns });
713 };
714
715 let task = next_task.ok_or_else(|| {
716 RuntimeError::new(
717 RuntimeErrorCode::Other("agent_frame_missing_task".to_string()),
718 "agent frame switch did not provide a task",
719 )
720 })?;
721 input = turn_input_from_text(task);
722 input.protocol_turn_options = follow_protocol_turn_options.clone();
723 input.turn_context = follow_turn_context.clone();
724 }
725 }
726
727 async fn stream_turn_inner(
728 &mut self,
729 mut input: TurnInput,
730 events: &dyn EventSink,
731 turn_events: &dyn TurnActivitySink,
732 scoped_effect_controller: ScopedEffectController<'_>,
733 cancel: CancellationToken,
734 queued_claim: Option<crate::QueuedWorkClaim>,
735 ) -> Result<AssembledTurn, RuntimeError> {
736 self.refresh_session_graph_from_store()
737 .await
738 .map_err(session_head_refresh_error)?;
739 let input_trace_turn_id = input.trace_turn_id.clone();
740 let queued_turn_work = queued_claim
741 .as_ref()
742 .map(crate::QueuedWorkClaim::materialize_for_turn);
743 if let Some(work) = queued_turn_work.as_ref()
744 && input.items.is_empty()
745 && input.image_blobs.is_empty()
746 {
747 input = work.input.clone();
748 if input.trace_turn_id.is_none() {
749 input.trace_turn_id = input_trace_turn_id;
750 }
751 }
752 if self
753 .session
754 .as_ref()
755 .and_then(|session| session.history_store())
756 .is_some()
757 {
758 ensure_durable_effect_input(&input)?;
759 }
760 if let Some(extension) = &input.protocol_extension
761 && let Some(session) = self.session.as_ref()
762 {
763 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
764 protocol_session
765 .validate_turn_extension(extension)
766 .await
767 .map_err(|err| {
768 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
769 })?;
770 }
771 let previous_prompt_usage = self.state.last_prompt_usage.clone();
772 let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
773 Ok(items) => items,
774 Err(e) => {
775 self.state.last_prompt_usage = None;
776 let mut assembler = TurnAssembler::default();
777 let error_event = SessionEvent::Error {
778 message: e.clone(),
779 envelope: Some(crate::session_model::ErrorEnvelope {
780 kind: "input_validation".to_string(),
781 code: Some("invalid_turn_input".to_string()),
782 terminal_reason: None,
783 user_message: e.clone(),
784 raw: None,
785 }),
786 };
787 assembler.push(&error_event);
788 emit_turn_activity_to_sink(
789 turn_events,
790 TurnActivity::independent(TurnEvent::Error { message: e }),
791 )
792 .await;
793 emit_session_event_to_sink(events, error_event).await;
794 let outcome_event = SessionEvent::TurnOutcome {
795 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
796 };
797 assembler.push(&outcome_event);
798 emit_session_event_to_sink(events, outcome_event).await;
799 assembler.push(&SessionEvent::Done);
800 emit_session_event_to_sink(events, SessionEvent::Done).await;
801 return Ok(assembler.finish(
802 self.state.to_snapshot(),
803 false,
804 None,
805 &self.host.core.control.termination,
806 ));
807 }
808 };
809 let turn_index = self.state.turn_index + 1;
810 let trace_turn_id = input
811 .trace_turn_id
812 .clone()
813 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
814 if self.host.core.tracing.trace_sink.is_some() {
815 let mut trace_metadata = std::collections::BTreeMap::new();
816 trace_metadata.insert(
817 "input_item_count".to_string(),
818 serde_json::json!(normalized.len()),
819 );
820 crate::trace::emit_trace(
821 &self.host.core.tracing.trace_sink,
822 &self.host.core.tracing.trace_context,
823 lash_trace::TraceContext::default()
824 .for_session(self.state.session_id.clone())
825 .for_turn_index(turn_index)
826 .for_turn(trace_turn_id.clone()),
827 lash_trace::TraceEvent::TurnStarted {
828 metadata: trace_metadata,
829 },
830 );
831 }
832
833 let base_read_model = self.state.read_model();
834 let base_messages = base_read_model.messages;
835 let base_render_cache = base_read_model.prompt_render_cache;
836 let mut turn_delta = Vec::new();
837 let initial_turn_causes = queued_turn_work
838 .as_ref()
839 .map(|work| work.turn_causes.clone())
840 .unwrap_or_default();
841 turn_delta.extend(
842 initial_turn_causes
843 .iter()
844 .map(crate::TurnCause::to_event_message),
845 );
846
847 let user_id = fresh_message_id();
848 let mut user_parts: Vec<Part> = Vec::new();
849 for item in normalized {
850 match item {
851 NormalizedItem::Text(text) => {
852 if text.is_empty() {
853 continue;
854 }
855 user_parts.push(Part {
856 id: format!("{}.p{}", user_id, user_parts.len()),
857 kind: PartKind::Text,
858 content: text,
859 attachment: None,
860 tool_call_id: None,
861 tool_name: None,
862 tool_replay: None,
863 prune_state: PruneState::Intact,
864 reasoning_meta: None,
865 response_meta: None,
866 });
867 }
868 NormalizedItem::Image(reference) => {
869 user_parts.push(Part {
870 id: format!("{}.p{}", user_id, user_parts.len()),
871 kind: PartKind::Image,
872 content: String::new(),
873 attachment: Some(crate::session_model::message::PartAttachment {
874 reference,
875 }),
876 tool_call_id: None,
877 tool_name: None,
878 tool_replay: None,
879 prune_state: PruneState::Intact,
880 reasoning_meta: None,
881 response_meta: None,
882 });
883 }
884 }
885 }
886 if user_parts.is_empty() && initial_turn_causes.is_empty() {
887 user_parts.push(Part {
888 id: format!("{}.p0", user_id),
889 kind: PartKind::Text,
890 content: String::new(),
891 attachment: None,
892 tool_call_id: None,
893 tool_name: None,
894 tool_replay: None,
895 prune_state: PruneState::Intact,
896 reasoning_meta: None,
897 response_meta: None,
898 });
899 }
900 if !user_parts.is_empty() {
901 reassign_part_ids(&user_id, &mut user_parts);
902 turn_delta.push(Message {
903 id: user_id.clone(),
904 role: MessageRole::User,
905 parts: shared_parts(user_parts),
906 origin: None,
907 });
908 }
909
910 let manager = self
911 .runtime_session_services_for_turn(None)
912 .map_err(|err| {
913 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
914 })?;
915 let plugin_session = self
916 .session
917 .as_ref()
918 .map(|s| Arc::clone(s.plugins()))
919 .ok_or_else(|| {
920 RuntimeError::new(
921 RuntimeErrorCode::ContextPrepareTurn,
922 "runtime session not available",
923 )
924 })?;
925 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
926 let prepare_phase_controller = scoped_child_turn_controller(
927 &scoped_effect_controller,
928 &self.state.session_id,
929 &prepare_phase_turn_id,
930 )?;
931 let turn_ctx = crate::TurnTransformContext {
932 session_id: self.state.session_id.clone(),
933 state: self.read_view(),
934 prompt_usage: previous_prompt_usage.clone(),
935 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
936 sessions: manager.state_service(),
937 session_lifecycle: manager.lifecycle_service(),
938 session_graph: manager.graph_service(),
939 scoped_effect_controller: scoped_effect_controller.clone(),
940 direct_completions: manager.direct_completion_client(
941 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
942 Some(prepare_phase_turn_id),
943 ),
944 };
945 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
946 let prepared_context = plugin_session
947 .prepare_turn_context(
948 &turn_ctx,
949 crate::session_model::context::PreparedContext {
950 messages: crate::MessageSequence::from_base_and_delta(
951 base_messages,
952 turn_delta,
953 )
954 .with_base_render_cache(base_render_cache),
955 ..Default::default()
956 },
957 self.turn_phase_probe.clone(),
958 )
959 .await
960 .map_err(|err| {
961 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
962 })?;
963 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
964 drop(turn_ctx);
969 let messages = prepared_context.messages;
970 if let Some(session) = self.session.as_mut() {
971 session
972 .set_context_surface(
973 prepared_context.tool_providers,
974 prepared_context.prompt_contributions,
975 prepared_context.include_base_tools,
976 )
977 .map_err(|err| {
978 RuntimeError::new(
979 RuntimeErrorCode::Other("session_tool_registry".to_string()),
980 err.to_string(),
981 )
982 })?;
983 }
984
985 self.state.last_prompt_usage = None;
986
987 self.stream_prepared_turn(
988 messages,
989 previous_prompt_usage,
990 input.protocol_turn_options.clone(),
991 input.protocol_extension.clone(),
992 input.turn_context.clone(),
993 initial_turn_causes,
994 trace_turn_id,
995 turn_index,
996 events,
997 turn_events,
998 scoped_effect_controller,
999 cancel,
1000 queued_claim,
1001 )
1002 .await
1003 }
1004
1005 pub async fn run_turn_assembled(
1007 &mut self,
1008 input: TurnInput,
1009 cancel: CancellationToken,
1010 scoped_effect_controller: ScopedEffectController<'_>,
1011 ) -> Result<AssembledTurn, RuntimeError> {
1012 self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1013 .await
1014 }
1015
1016 #[allow(clippy::too_many_arguments)]
1018 pub async fn stream_prepared_turn(
1019 &mut self,
1020 messages: crate::MessageSequence,
1021 _previous_prompt_usage: Option<PromptUsage>,
1022 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1023 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1024 turn_context: crate::TurnContext,
1025 initial_turn_causes: Vec<crate::TurnCause>,
1026 trace_turn_id: String,
1027 turn_index: usize,
1028 events: &dyn EventSink,
1029 turn_events: &dyn TurnActivitySink,
1030 scoped_effect_controller: ScopedEffectController<'_>,
1031 cancel: CancellationToken,
1032 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1033 ) -> Result<AssembledTurn, RuntimeError> {
1034 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1035 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1036 let mut turn_policy = self.state.effective_policy().clone();
1037 let turn_provider_override = turn_context.provider().cloned();
1038 if let Some(provider) = turn_provider_override.as_ref() {
1039 turn_policy.provider_id = provider.kind().to_string();
1040 }
1041 if let Some(model) = turn_context.model_spec() {
1042 turn_policy.model = model.clone();
1043 }
1044 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1045 let effective_protocol_turn_options = protocol_turn_options
1046 .clone()
1047 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1048 .unwrap_or(session_protocol_turn_options);
1049 let manager = self
1050 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1051 .map_err(|err| {
1052 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1053 })?;
1054 let plugins = {
1055 let session = self
1056 .session
1057 .as_ref()
1058 .expect("lash runtime session must be available");
1059 Arc::clone(session.plugins())
1060 };
1061 let mut assembler = TurnAssembler::new();
1062 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1063 let prepared = {
1069 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1070 PrepareTurnRequest {
1071 session_id: self.state.session_id.clone(),
1072 state: crate::SessionReadView::from_runtime_state(
1073 &self.state,
1074 turn_policy.clone(),
1075 effective_protocol_turn_options.clone(),
1076 ),
1077 messages,
1078 sessions: manager.state_service(),
1079 session_lifecycle: manager.lifecycle_service(),
1080 session_graph: manager.graph_service(),
1081 turn_context: turn_context.clone(),
1082 },
1083 self.turn_phase_probe.clone(),
1084 );
1085 tokio::pin!(prepare_turn);
1086
1087 loop {
1088 tokio::select! {
1089 prepared = &mut prepare_turn => {
1090 let prepared = prepared.map_err(|err| {
1091 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1092 })?;
1093 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1094 break prepared;
1095 }
1096 maybe_event = event_rx.recv() => {
1097 if let Some(event) = maybe_event {
1098 emit_runtime_stream_event_to_sinks(
1099 events,
1100 turn_events,
1101 event,
1102 &mut assembler,
1103 )
1104 .await;
1105 }
1106 }
1107 }
1108 }
1109 };
1110 for event in &prepared.events {
1111 assembler.push(event);
1112 }
1113 emit_session_events_to_sink(events, prepared.events).await;
1114 if let Some(abort) = prepared.abort {
1115 drop(event_tx);
1116
1117 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1118 turn_pipeline.apply_prepared_messages(&prepared.messages);
1119 let state = turn_pipeline.into_final_state();
1120 let issue = TurnIssue {
1121 kind: "plugin".to_string(),
1122 code: Some(abort.code),
1123 terminal_reason: None,
1124 message: abort.message.clone(),
1125 raw: None,
1126 };
1127 let error_event = SessionEvent::Error {
1128 message: abort.message,
1129 envelope: Some(crate::session_model::ErrorEnvelope {
1130 kind: "plugin".to_string(),
1131 code: issue.code.clone(),
1132 terminal_reason: None,
1133 user_message: issue.message.clone(),
1134 raw: None,
1135 }),
1136 };
1137 assembler.push(&error_event);
1138 emit_turn_activity_to_sink(
1139 turn_events,
1140 TurnActivity::independent(TurnEvent::Error {
1141 message: issue.message.clone(),
1142 }),
1143 )
1144 .await;
1145 emit_session_event_to_sink(events, error_event).await;
1146 let outcome_event = SessionEvent::TurnOutcome {
1147 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1148 };
1149 assembler.push(&outcome_event);
1150 emit_session_event_to_sink(events, outcome_event).await;
1151 assembler.push(&SessionEvent::Done);
1152 emit_session_event_to_sink(events, SessionEvent::Done).await;
1153 return Ok(assembler.finish(
1154 state.to_snapshot(),
1155 cancel.is_cancelled(),
1156 Some(issue),
1157 &self.host.core.control.termination,
1158 ));
1159 }
1160 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1161 let store = self
1162 .session
1163 .as_ref()
1164 .and_then(|session| session.history_store());
1165 turn_pipeline
1166 .prepared_checkpoint(
1167 store.as_ref().map(|store| store.as_ref()),
1168 turn_policy.clone(),
1169 turn_index,
1170 &prepared.messages,
1171 self.session.as_mut(),
1172 )
1173 .await
1174 .map_err(|err| {
1175 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1176 })?;
1177 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1178 RuntimeSessionPolicy::from_provider(turn_policy.clone(), provider)
1179 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1180 } else {
1181 self.host
1182 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1183 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1184 };
1185 let manager = self
1186 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1187 .map_err(|err| {
1188 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1189 })?;
1190 let cancel_state = cancel.clone();
1191 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1192 let session = self
1193 .session
1194 .take()
1195 .expect("lash runtime session must be available");
1196 let mut driver = RuntimeTurnDriver {
1197 session,
1198 policy: resolved_turn_policy,
1199 host: self.host.clone(),
1200 turn_id: scoped_effect_controller.scope_id().to_string(),
1201 scoped_effect_controller,
1202 session_id: self.state.session_id.clone(),
1203 turn_index,
1204 turn_pipeline,
1205 llm_stream_summaries: HashMap::new(),
1206 next_llm_ordinal: 0,
1207 session_services: manager,
1208 protocol_turn_options: effective_protocol_turn_options,
1209 protocol_extension,
1210 turn_context,
1211 turn_causes: initial_turn_causes,
1212 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1213 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1214 turn_phase_probe: self.turn_phase_probe.clone(),
1215 };
1216 let protocol_run_offset = 0;
1217 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1218 let run_result = drive_turn_to_completion(
1219 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1220 &mut event_rx,
1221 &mut assembler,
1222 &child_usage_event_relay,
1223 events,
1224 turn_events,
1225 )
1226 .await;
1227 let (new_messages, _new_protocol_iteration) = match run_result {
1228 Ok(result) => result,
1229 Err(err) => {
1230 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1231 let RuntimeTurnDriver { session, .. } = driver;
1232 self.session = Some(session);
1233 return Err(err);
1234 }
1235 };
1236 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1237 tracing::debug!(
1238 new_message_count = new_messages.len(),
1239 tool_call_count = assembler.tool_calls.len(),
1240 "runtime post-run_task"
1241 );
1242
1243 let RuntimeTurnDriver {
1244 session,
1245 policy,
1246 turn_pipeline,
1247 pending_queue_claims,
1248 ..
1249 } = driver;
1250 self.session = Some(session);
1251 self.finish_turn(
1252 TurnFinishInput {
1253 turn_pipeline,
1254 assembler,
1255 new_messages,
1256 policy,
1257 turn_index,
1258 queued_work_completions: pending_queue_claims
1259 .iter()
1260 .map(crate::QueuedWorkClaim::completion)
1261 .collect(),
1262 trace_turn_id,
1263 },
1264 events,
1265 &finish_scoped_effect_controller,
1266 &cancel_state,
1267 )
1268 .await
1269 }
1270 fn normalize_input_items(
1271 &self,
1272 items: &[InputItem],
1273 image_blobs: &HashMap<String, Vec<u8>>,
1274 ) -> Result<Vec<NormalizedItem>, String> {
1275 normalize_input_items(
1276 items,
1277 image_blobs,
1278 self.host.core.durability.attachment_store.as_ref(),
1279 )
1280 }
1281}
1282
1283fn frame_switch_task(turn: &AssembledTurn, frame_id: &str) -> Option<String> {
1284 turn.tool_calls
1285 .iter()
1286 .find_map(|record| match &record.output.control {
1287 Some(crate::ToolControl::SwitchAgentFrame {
1288 frame_id: control_frame_id,
1289 task: Some(task),
1290 ..
1291 }) if control_frame_id == frame_id => Some(task.clone()),
1292 _ => None,
1293 })
1294}
1295
1296fn turn_input_from_text(text: String) -> TurnInput {
1297 TurnInput {
1298 items: vec![InputItem::Text { text }],
1299 image_blobs: HashMap::new(),
1300 protocol_turn_options: None,
1301 trace_turn_id: None,
1302 protocol_extension: None,
1303 turn_context: crate::TurnContext::default(),
1304 }
1305}
1306
1307fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1308 if completed_turn_count == 0 {
1309 root_turn_id.to_string()
1310 } else {
1311 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1312 }
1313}
1314
1315pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1316 if input.protocol_extension.is_some() {
1317 return Err(RuntimeError::new(
1318 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1319 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1320 ));
1321 }
1322 input
1323 .turn_context
1324 .live_plugin_inputs()
1325 .durable_effect_rejection()?;
1326 Ok(())
1327}
1328
1329async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1330 if !events.is_noop() {
1331 events.emit(activity).await;
1332 }
1333}
1334
1335async fn drive_turn_to_completion<F>(
1345 run_future: F,
1346 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1347 assembler: &mut TurnAssembler,
1348 child_usage_event_relay: &ChildUsageEventRelay,
1349 events: &dyn EventSink,
1350 turn_events: &dyn TurnActivitySink,
1351) -> Result<(crate::MessageSequence, usize), RuntimeError>
1352where
1353 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1354{
1355 let run_result = {
1356 tokio::pin!(run_future);
1357 loop {
1358 tokio::select! {
1359 maybe_event = event_rx.recv() => {
1360 if let Some(event) = maybe_event {
1361 emit_runtime_stream_event_to_sinks(
1362 events,
1363 turn_events,
1364 event,
1365 assembler,
1366 )
1367 .await;
1368 }
1369 }
1370 completed = &mut run_future => {
1371 child_usage_event_relay.clear();
1372 break completed;
1373 }
1374 }
1375 }
1376 };
1377 while let Some(event) = event_rx.recv().await {
1378 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1379 }
1380 run_result
1381}
1382
1383async fn emit_runtime_stream_event_to_sinks(
1384 events: &dyn EventSink,
1385 turn_events: &dyn TurnActivitySink,
1386 event: RuntimeStreamEvent,
1387 assembler: &mut TurnAssembler,
1388) {
1389 match event {
1390 RuntimeStreamEvent::Session(event) => {
1391 assembler.push(&event);
1392 emit_session_event_to_sink(events, event).await;
1393 }
1394 RuntimeStreamEvent::Turn(activity) => {
1395 assembler.push_turn_activity(&activity);
1396 emit_turn_activity_to_sink(turn_events, activity).await;
1397 }
1398 }
1399}
1400
1401#[cfg(test)]
1402mod tests {
1403 use super::agent_frame_follow_turn_id;
1404
1405 #[test]
1406 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1407 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1408 assert_eq!(
1409 agent_frame_follow_turn_id("root-turn", 1),
1410 "root-turn:agent-frame:1"
1411 );
1412 assert_eq!(
1413 agent_frame_follow_turn_id("root-turn", 2),
1414 "root-turn:agent-frame:2"
1415 );
1416 }
1417}