1use std::collections::{HashSet, VecDeque};
8use std::fmt::Debug;
9use std::sync::Arc;
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use serde_json::Value;
14
15use crate::llm::types::{
16 LlmAttachment, LlmOutputPart, LlmRequest, LlmResponse, LlmTerminalReason, LlmToolChoice,
17 LlmToolSpec, ProviderReplayMeta,
18};
19use crate::session_model::message::MessageOrigin;
20use crate::session_model::{
21 Message, MessageRole, MessageSequence, Part, PartKind, PruneState, SessionEvent,
22 SessionEventRecord, TokenUsage, ToolEvent, TurnTerminationPolicyState, make_error_event,
23 reassign_part_ids, render_prompt,
24};
25use crate::{
26 CheckpointKind, ModelToolReturn, PluginMessage, ToolCallOutput, TurnOutcome, TurnStop,
27};
28
29pub trait TurnProtocol: Send + Sync + 'static {
32 type Event: Clone + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
33 type Termination: Clone + Default + Debug + Send + Sync + 'static;
34 type DriverState: Clone + Default + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
35}
36
37#[derive(Clone, Debug, Serialize, serde::Deserialize)]
38pub struct UnitTurnProtocol;
39
40impl TurnProtocol for UnitTurnProtocol {
41 type Event = ();
42 type Termination = ();
43 type DriverState = serde_json::Value;
44}
45
46#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
48pub struct EffectId(pub u64);
49
50#[derive(Clone, Debug, Serialize, serde::Deserialize)]
51pub struct PendingToolCall {
52 pub call_id: String,
53 pub tool_name: String,
54 pub args: Value,
55 pub replay: Option<ProviderReplayMeta>,
57}
58
59#[derive(Clone, Debug, Serialize, serde::Deserialize)]
60pub struct CompletedToolCall {
61 pub call_id: String,
62 pub tool_name: String,
63 pub args: Value,
64 pub output: ToolCallOutput,
65 pub model_return: ModelToolReturn,
66 pub duration_ms: u64,
67 pub replay: Option<ProviderReplayMeta>,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
72pub struct TurnCause {
73 pub id: String,
74 pub event_type: String,
75 pub origin: MessageOrigin,
76 pub text: String,
77}
78
79impl TurnCause {
80 pub fn to_event_message(&self) -> Message {
81 Message {
82 id: self.id.clone(),
83 role: MessageRole::Event,
84 parts: Arc::new(vec![Part {
85 id: format!("{}.p0", self.id),
86 kind: PartKind::Text,
87 content: self.text.clone(),
88 attachment: None,
89 tool_call_id: None,
90 tool_name: None,
91 tool_replay: None,
92 prune_state: PruneState::Intact,
93 reasoning_meta: None,
94 response_meta: None,
95 }]),
96 origin: Some(self.origin.clone()),
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
102pub struct CheckpointDelivery {
103 pub messages: Vec<PluginMessage>,
104 pub transient_messages: Vec<PluginMessage>,
105 pub turn_causes: Vec<TurnCause>,
106}
107
108pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
109 if causes.is_empty() {
110 return None;
111 }
112
113 let mut rendered = String::from("=== TURN EVENTS ===");
114 for (index, cause) in causes.iter().enumerate() {
115 rendered.push_str("\n\n");
116 rendered.push_str(&format!(
117 "--- event[{index}] · {} · {} ---\n",
118 cause.event_type, cause.id
119 ));
120 rendered.push_str("Origin: ");
121 rendered.push_str(&render_message_origin(&cause.origin));
122 rendered.push_str("\n\n");
123 rendered.push_str(cause.text.trim());
124 }
125 Some(rendered)
126}
127
128fn render_message_origin(origin: &MessageOrigin) -> String {
129 match origin {
130 MessageOrigin::Plugin {
131 plugin_id,
132 transient,
133 } => {
134 if *transient {
135 format!("plugin {plugin_id} (transient)")
136 } else {
137 format!("plugin {plugin_id}")
138 }
139 }
140 MessageOrigin::Process {
141 process_id,
142 event_type,
143 sequence,
144 wake_id,
145 ..
146 } => match wake_id {
147 Some(wake_id) => {
148 format!("process {process_id} {event_type} #{sequence} ({wake_id})")
149 }
150 None => format!("process {process_id} {event_type} #{sequence}"),
151 },
152 }
153}
154
155#[derive(Clone, Debug, Serialize, serde::Deserialize)]
156pub enum LogEvent {
157 LlmDebug {
158 session_id: String,
159 protocol_iteration: usize,
160 usage: TokenUsage,
161 provider_usage: Option<Value>,
162 request_body: Option<String>,
163 response_text: String,
164 response_parts: Option<Value>,
165 },
166 LlmError {
167 session_id: String,
168 protocol_iteration: usize,
169 request_body: Option<String>,
170 message: String,
171 retryable: bool,
172 raw: Option<String>,
173 code: Option<String>,
174 terminal_reason: LlmTerminalReason,
175 },
176}
177
178#[derive(Debug, Serialize, serde::Deserialize)]
185#[allow(clippy::large_enum_variant)]
186pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
187 SyncExecutionSurface {
195 id: EffectId,
196 update_machine_config: bool,
197 },
198 LlmCall {
200 id: EffectId,
201 request: Arc<LlmRequest>,
202 },
203 CancelLlm { id: EffectId },
205 ToolCalls {
207 id: EffectId,
208 calls: Vec<PendingToolCall>,
209 },
210 ExecCode { id: EffectId, code: String },
212 Checkpoint {
214 id: EffectId,
215 checkpoint: CheckpointKind,
216 },
217 Log { event: LogEvent },
219 Emit(SessionEvent),
221 Progress {
227 messages: MessageSequence,
228 event_delta: Vec<SessionEventRecord<M::Event>>,
229 protocol_iteration: usize,
230 },
231 Done {
233 messages: MessageSequence,
234 event_delta: Vec<SessionEventRecord<M::Event>>,
235 protocol_iteration: usize,
236 },
237}
238
239impl<M: TurnProtocol> Clone for Effect<M> {
240 fn clone(&self) -> Self {
241 match self {
242 Self::SyncExecutionSurface {
243 id,
244 update_machine_config,
245 } => Self::SyncExecutionSurface {
246 id: *id,
247 update_machine_config: *update_machine_config,
248 },
249 Self::LlmCall { id, request } => Self::LlmCall {
250 id: *id,
251 request: Arc::clone(request),
252 },
253 Self::CancelLlm { id } => Self::CancelLlm { id: *id },
254 Self::ToolCalls { id, calls } => Self::ToolCalls {
255 id: *id,
256 calls: calls.clone(),
257 },
258 Self::ExecCode { id, code } => Self::ExecCode {
259 id: *id,
260 code: code.clone(),
261 },
262 Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
263 id: *id,
264 checkpoint: *checkpoint,
265 },
266 Self::Log { event } => Self::Log {
267 event: event.clone(),
268 },
269 Self::Emit(event) => Self::Emit(event.clone()),
270 Self::Progress {
271 messages,
272 event_delta,
273 protocol_iteration,
274 } => Self::Progress {
275 messages: messages.clone(),
276 event_delta: event_delta.clone(),
277 protocol_iteration: *protocol_iteration,
278 },
279 Self::Done {
280 messages,
281 event_delta,
282 protocol_iteration,
283 } => Self::Done {
284 messages: messages.clone(),
285 event_delta: event_delta.clone(),
286 protocol_iteration: *protocol_iteration,
287 },
288 }
289 }
290}
291
292impl<M: TurnProtocol> Effect<M> {
293 fn id(&self) -> Option<EffectId> {
294 match self {
295 Self::SyncExecutionSurface { id, .. }
296 | Self::LlmCall { id, .. }
297 | Self::CancelLlm { id }
298 | Self::ToolCalls { id, .. }
299 | Self::ExecCode { id, .. }
300 | Self::Checkpoint { id, .. } => Some(*id),
301 Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
302 }
303 }
304}
305
306#[derive(Clone, Debug, Serialize, serde::Deserialize)]
308pub struct LlmCallError {
309 pub message: String,
310 pub retryable: bool,
311 pub raw: Option<String>,
312 pub code: Option<String>,
313 pub terminal_reason: LlmTerminalReason,
314 pub request_body: Option<String>,
315}
316
317pub enum Response {
319 ExecutionSurfaceSynced {
321 id: EffectId,
322 result: Result<Option<ExecutionSurfaceSync>, String>,
323 },
324 LlmComplete {
326 id: EffectId,
327 result: Result<LlmResponse, LlmCallError>,
328 text_streamed: bool,
331 },
332 ToolResults {
334 id: EffectId,
335 results: Vec<CompletedToolCall>,
336 },
337 ExecResult {
339 id: EffectId,
340 result: Result<crate::ExecResponse, String>,
341 },
342 Checkpoint {
344 id: EffectId,
345 delivery: CheckpointDelivery,
346 },
347}
348
349#[derive(Clone, Debug, Serialize, serde::Deserialize)]
350pub struct ExecutionSurfaceSync {
351 pub system_prompt: Arc<str>,
352 pub tool_specs: Arc<Vec<LlmToolSpec>>,
353}
354
355pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
356 pub request: Arc<LlmRequest>,
357 driver_state: Option<M::DriverState>,
358}
359
360impl<M: TurnProtocol> WaitingLlmState<M> {
361 pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
362 self.driver_state.take()
363 }
364}
365
366pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
367 driver_state: M::DriverState,
368}
369
370impl<M: TurnProtocol> WaitingExecState<M> {
371 pub fn into_driver_state(self) -> M::DriverState {
372 self.driver_state
373 }
374}
375
376#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
377pub enum CheckpointResumeAction {
378 PrepareIteration,
379 Finish(TurnOutcome),
380}
381
382#[allow(clippy::large_enum_variant)]
383pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
384 Emit(SessionEvent),
385 AppendEvents(Vec<SessionEventRecord<M::Event>>),
386 StartLlm {
387 request: Arc<LlmRequest>,
388 driver_state: Option<M::DriverState>,
389 },
390 StartTools {
391 calls: Vec<PendingToolCall>,
392 },
393 StartExec {
394 code: String,
395 driver_state: M::DriverState,
396 },
397 StartCheckpoint {
398 checkpoint: CheckpointKind,
399 on_empty: CheckpointResumeAction,
400 },
401 AdvanceProtocolIteration,
402 ScheduleTurnLimitFinal {
403 message: Message,
404 },
405 Finish(TurnOutcome),
406}
407
408pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
409 config: &'a TurnMachineConfig<M>,
410 messages: &'a MessageSequence,
411 events: &'a [SessionEventRecord<M::Event>],
412 turn_causes: &'a [TurnCause],
413 protocol_iteration: usize,
414 protocol_run_offset: usize,
415 termination: &'a TurnTerminationPolicyState,
416}
417
418impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
419 pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
420 self.config.projector.project(ProjectorContext {
421 config: self.config,
422 messages: self.messages,
423 events: self.events,
424 turn_causes: self.turn_causes,
425 protocol_iteration: self.protocol_iteration,
426 use_tools,
427 })
428 }
429
430 pub fn protocol_iteration(&self) -> usize {
431 self.protocol_iteration
432 }
433
434 pub fn protocol_run_offset(&self) -> usize {
435 self.protocol_run_offset
436 }
437
438 pub fn max_turns(&self) -> Option<usize> {
439 self.config.max_turns
440 }
441
442 pub fn termination(&self) -> &M::Termination {
443 &self.config.termination
444 }
445
446 pub fn autonomous(&self) -> bool {
447 self.config.autonomous
448 }
449
450 pub fn should_force_exit_after_grace_turn(&self) -> bool {
451 self.termination.should_force_exit_after_grace_turn()
452 }
453
454 pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
455 self.termination.turn_limit_final_to_schedule(
456 self.protocol_iteration,
457 self.protocol_run_offset,
458 self.config.max_turns,
459 )
460 }
461
462 pub fn messages(&self) -> &MessageSequence {
463 self.messages
464 }
465
466 pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
467 self.events
468 }
469
470 pub fn turn_causes(&self) -> &[TurnCause] {
471 self.turn_causes
472 }
473}
474
475pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
476 pub config: &'a TurnMachineConfig<M>,
477 pub messages: &'a MessageSequence,
478 pub events: &'a [SessionEventRecord<M::Event>],
479 pub turn_causes: &'a [TurnCause],
480 pub protocol_iteration: usize,
481 pub use_tools: bool,
482}
483
484pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
485 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
486}
487
488#[derive(Clone, Debug, Default)]
489pub struct ChatContextProjector;
490
491impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
492 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
493 let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
494 let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
495 let mut messages = rendered_prompt.messages;
496 if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
497 messages.push(crate::llm::types::LlmMessage::text(
498 crate::llm::types::LlmRole::User,
499 Arc::from(turn_events),
500 ));
501 }
502 if !ctx.config.system_prompt.trim().is_empty() {
503 messages.insert(
504 0,
505 crate::llm::types::LlmMessage::text(
506 crate::llm::types::LlmRole::System,
507 Arc::clone(&ctx.config.system_prompt),
508 ),
509 );
510 }
511
512 Arc::new(LlmRequest {
513 model: ctx.config.model.clone(),
514 messages,
515 attachments,
516 tools: if ctx.use_tools {
517 Arc::clone(&ctx.config.tool_specs)
518 } else {
519 Arc::new(Vec::new())
520 },
521 tool_choice: if ctx.use_tools {
522 LlmToolChoice::Auto
523 } else {
524 LlmToolChoice::None
525 },
526 model_variant: ctx.config.model_variant.clone(),
527 generation: ctx.config.generation.clone(),
528 session_id: ctx.config.run_session_id.clone(),
529 output_spec: None,
530 stream_events: None,
531 provider_trace: None,
532 })
533 }
534}
535
536fn render_messages_for_projector(
537 messages: &MessageSequence,
538 turn_causes: &[TurnCause],
539) -> crate::RenderedPrompt {
540 if turn_causes.is_empty() {
541 return messages.render_prompt();
542 }
543
544 let active_cause_ids = turn_causes
545 .iter()
546 .map(|cause| cause.id.as_str())
547 .collect::<HashSet<_>>();
548 let filtered = messages
549 .iter()
550 .filter(|message| {
551 !(matches!(message.role, MessageRole::Event)
552 && active_cause_ids.contains(message.id.as_str()))
553 })
554 .cloned()
555 .collect::<Vec<_>>();
556 render_prompt(filtered.as_slice())
557}
558
559pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
560 fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
561 fn handle_llm_success(
562 &self,
563 ctx: DriverContextView<'_, M>,
564 waiting: WaitingLlmState<M>,
565 llm_response: LlmResponse,
566 text_streamed: bool,
567 ) -> Vec<DriverAction<M>>;
568 fn handle_tool_results(
569 &self,
570 ctx: DriverContextView<'_, M>,
571 completed: Vec<CompletedToolCall>,
572 ) -> Vec<DriverAction<M>>;
573 fn handle_exec_result(
574 &self,
575 ctx: DriverContextView<'_, M>,
576 waiting: WaitingExecState<M>,
577 result: Result<crate::ExecResponse, String>,
578 ) -> Vec<DriverAction<M>>;
579}
580
581pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
583 pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
584 pub projector: Arc<dyn ContextProjector<M>>,
585 pub sync_execution_surface: bool,
586 pub model: String,
587 pub max_context_tokens: Option<usize>,
592 pub max_turns: Option<usize>,
593 pub model_variant: Option<String>,
594 pub generation: crate::llm::types::GenerationOptions,
595 pub run_session_id: Option<String>,
596 pub autonomous: bool,
597 pub tool_specs: Arc<Vec<LlmToolSpec>>,
598 pub system_prompt: Arc<str>,
599 pub session_id: String,
600 pub emit_llm_trace: bool,
601 pub termination: M::Termination,
602 pub turn_limit_final_message: crate::TurnLimitFinalMessage,
603}
604
605#[derive(Debug, Serialize, serde::Deserialize)]
608enum MachineState<M: TurnProtocol = UnitTurnProtocol> {
609 PreparingProtocol,
610 WaitingExecutionSurface {
611 effect_id: EffectId,
612 update_machine_config: bool,
613 },
614 PrepareIteration,
615 WaitingLlm {
616 effect_id: EffectId,
617 request: Arc<LlmRequest>,
618 driver_state: Option<M::DriverState>,
619 },
620 WaitingTools {
621 effect_id: EffectId,
622 calls: Vec<PendingToolCall>,
623 },
624 WaitingExec {
625 effect_id: EffectId,
626 code: String,
627 driver_state: M::DriverState,
628 },
629 WaitingCheckpoint {
630 effect_id: EffectId,
631 checkpoint: CheckpointKind,
632 on_empty: CheckpointResumeAction,
633 },
634 Finished,
635}
636
637#[derive(Clone, Debug, Serialize, serde::Deserialize)]
638pub struct TurnCheckpoint<M: TurnProtocol = UnitTurnProtocol> {
639 state: MachineState<M>,
640 pending_effects: Vec<Effect<M>>,
641 next_effect_id: u64,
642 #[serde(default)]
643 next_synthetic_message_id: u64,
644 messages: Vec<Message>,
645 events: Vec<SessionEventRecord<M::Event>>,
646 #[serde(default)]
647 turn_causes: Vec<TurnCause>,
648 #[serde(default)]
649 progress_event_cursor: usize,
650 protocol_iteration: usize,
651 protocol_run_offset: usize,
652 cumulative_usage: TokenUsage,
653 termination: TurnTerminationPolicyState,
654 synced_protocol_iteration: Option<usize>,
655}
656
657impl<M: TurnProtocol> Clone for MachineState<M> {
658 fn clone(&self) -> Self {
659 match self {
660 Self::PreparingProtocol => Self::PreparingProtocol,
661 Self::WaitingExecutionSurface {
662 effect_id,
663 update_machine_config,
664 } => Self::WaitingExecutionSurface {
665 effect_id: *effect_id,
666 update_machine_config: *update_machine_config,
667 },
668 Self::PrepareIteration => Self::PrepareIteration,
669 Self::WaitingLlm {
670 effect_id,
671 request,
672 driver_state,
673 } => Self::WaitingLlm {
674 effect_id: *effect_id,
675 request: Arc::clone(request),
676 driver_state: driver_state.clone(),
677 },
678 Self::WaitingTools { effect_id, calls } => Self::WaitingTools {
679 effect_id: *effect_id,
680 calls: calls.clone(),
681 },
682 Self::WaitingExec {
683 effect_id,
684 code,
685 driver_state,
686 } => Self::WaitingExec {
687 effect_id: *effect_id,
688 code: code.clone(),
689 driver_state: driver_state.clone(),
690 },
691 Self::WaitingCheckpoint {
692 effect_id,
693 checkpoint,
694 on_empty,
695 } => Self::WaitingCheckpoint {
696 effect_id: *effect_id,
697 checkpoint: *checkpoint,
698 on_empty: on_empty.clone(),
699 },
700 Self::Finished => Self::Finished,
701 }
702 }
703}
704
705impl<M: TurnProtocol> MachineState<M> {
706 fn outstanding_effect_id(&self) -> Option<EffectId> {
707 match self {
708 Self::WaitingExecutionSurface { effect_id, .. }
709 | Self::WaitingLlm { effect_id, .. }
710 | Self::WaitingTools { effect_id, .. }
711 | Self::WaitingExec { effect_id, .. }
712 | Self::WaitingCheckpoint { effect_id, .. } => Some(*effect_id),
713 Self::PreparingProtocol | Self::PrepareIteration | Self::Finished => None,
714 }
715 }
716
717 fn outstanding_effect(&self) -> Option<Effect<M>> {
718 match self {
719 Self::WaitingExecutionSurface {
720 effect_id,
721 update_machine_config,
722 } => Some(Effect::SyncExecutionSurface {
723 id: *effect_id,
724 update_machine_config: *update_machine_config,
725 }),
726 Self::WaitingLlm {
727 effect_id, request, ..
728 } => Some(Effect::LlmCall {
729 id: *effect_id,
730 request: Arc::clone(request),
731 }),
732 Self::WaitingTools { effect_id, calls } => Some(Effect::ToolCalls {
733 id: *effect_id,
734 calls: calls.clone(),
735 }),
736 Self::WaitingExec {
737 effect_id, code, ..
738 } => Some(Effect::ExecCode {
739 id: *effect_id,
740 code: code.clone(),
741 }),
742 Self::WaitingCheckpoint {
743 effect_id,
744 checkpoint,
745 ..
746 } => Some(Effect::Checkpoint {
747 id: *effect_id,
748 checkpoint: *checkpoint,
749 }),
750 Self::PreparingProtocol | Self::PrepareIteration | Self::Finished => None,
751 }
752 }
753}
754
755pub struct TurnMachine<M: TurnProtocol = UnitTurnProtocol> {
757 config: TurnMachineConfig<M>,
758 state: MachineState<M>,
759 pending_effects: VecDeque<Effect<M>>,
760 active_effect_redelivery: bool,
761 next_effect_id: u64,
762 next_synthetic_message_id: u64,
763 messages: MessageSequence,
764 events: Arc<Vec<SessionEventRecord<M::Event>>>,
765 turn_causes: Vec<TurnCause>,
766 progress_event_cursor: usize,
767 protocol_iteration: usize,
768 protocol_run_offset: usize,
769 cumulative_usage: TokenUsage,
770 termination: TurnTerminationPolicyState,
771 synced_protocol_iteration: Option<usize>,
772}
773
774impl<M: TurnProtocol> TurnMachine<M> {
775 pub fn new(
777 config: TurnMachineConfig<M>,
778 messages: Vec<Message>,
779 events: Arc<Vec<SessionEventRecord<M::Event>>>,
780 protocol_run_offset: usize,
781 ) -> Self {
782 Self::new_shared(
783 config,
784 MessageSequence::from_owned(messages),
785 events,
786 protocol_run_offset,
787 )
788 }
789
790 pub fn new_shared(
791 config: TurnMachineConfig<M>,
792 messages: MessageSequence,
793 events: Arc<Vec<SessionEventRecord<M::Event>>>,
794 protocol_run_offset: usize,
795 ) -> Self {
796 Self::new_shared_with_turn_causes(config, messages, events, protocol_run_offset, Vec::new())
797 }
798
799 pub fn new_shared_with_turn_causes(
800 config: TurnMachineConfig<M>,
801 messages: MessageSequence,
802 events: Arc<Vec<SessionEventRecord<M::Event>>>,
803 protocol_run_offset: usize,
804 turn_causes: Vec<TurnCause>,
805 ) -> Self {
806 let next_synthetic_message_id = messages.len() as u64;
807 Self {
808 config,
809 state: MachineState::PreparingProtocol,
810 pending_effects: VecDeque::new(),
811 active_effect_redelivery: false,
812 next_effect_id: 1,
813 next_synthetic_message_id,
814 messages,
815 progress_event_cursor: events.len(),
816 events,
817 turn_causes,
818 protocol_iteration: protocol_run_offset,
819 protocol_run_offset,
820 cumulative_usage: TokenUsage::default(),
821 termination: TurnTerminationPolicyState::new(),
822 synced_protocol_iteration: None,
823 }
824 }
825
826 pub fn is_done(&self) -> bool {
828 matches!(self.state, MachineState::Finished)
829 }
830
831 pub fn messages(&self) -> Arc<Vec<Message>> {
832 self.messages.shared()
833 }
834
835 pub fn events(&self) -> Arc<Vec<SessionEventRecord<M::Event>>> {
836 Arc::clone(&self.events)
837 }
838
839 pub fn message_sequence(&self) -> MessageSequence {
840 self.messages.clone()
841 }
842
843 pub fn protocol_iteration(&self) -> usize {
844 self.protocol_iteration
845 }
846
847 pub fn checkpoint(&self) -> TurnCheckpoint<M> {
848 let active_effect_id = self.state.outstanding_effect_id();
849 let pending_effects = self
850 .pending_effects
851 .iter()
852 .filter(|effect| active_effect_id.is_none_or(|id| effect.id() != Some(id)))
853 .cloned()
854 .collect::<Vec<_>>();
855 TurnCheckpoint {
856 state: self.state.clone(),
857 pending_effects,
858 next_effect_id: self.next_effect_id,
859 next_synthetic_message_id: self.next_synthetic_message_id,
860 messages: self.messages.iter().cloned().collect(),
861 events: self.events.as_ref().clone(),
862 turn_causes: self.turn_causes.clone(),
863 progress_event_cursor: self.progress_event_cursor,
864 protocol_iteration: self.protocol_iteration,
865 protocol_run_offset: self.protocol_run_offset,
866 cumulative_usage: self.cumulative_usage.clone(),
867 termination: self.termination.clone(),
868 synced_protocol_iteration: self.synced_protocol_iteration,
869 }
870 }
871
872 pub fn restore_from_checkpoint(
873 config: TurnMachineConfig<M>,
874 checkpoint: TurnCheckpoint<M>,
875 ) -> Self {
876 let active_effect_id = checkpoint.state.outstanding_effect_id();
877 let pending_effects = checkpoint
878 .pending_effects
879 .into_iter()
880 .collect::<VecDeque<_>>();
881 let active_effect_redelivery = active_effect_id.is_some()
882 && !pending_effects
883 .iter()
884 .any(|effect| effect.id() == active_effect_id);
885 Self {
886 config,
887 state: checkpoint.state,
888 pending_effects,
889 active_effect_redelivery,
890 next_effect_id: checkpoint.next_effect_id,
891 next_synthetic_message_id: checkpoint.next_synthetic_message_id,
892 messages: MessageSequence::from_owned(checkpoint.messages),
893 events: Arc::new(checkpoint.events),
894 turn_causes: checkpoint.turn_causes,
895 progress_event_cursor: checkpoint.progress_event_cursor,
896 protocol_iteration: checkpoint.protocol_iteration,
897 protocol_run_offset: checkpoint.protocol_run_offset,
898 cumulative_usage: checkpoint.cumulative_usage,
899 termination: checkpoint.termination,
900 synced_protocol_iteration: checkpoint.synced_protocol_iteration,
901 }
902 }
903
904 fn driver_context(&self) -> DriverContextView<'_, M> {
905 DriverContextView {
906 config: &self.config,
907 messages: &self.messages,
908 events: self.events.as_slice(),
909 turn_causes: &self.turn_causes,
910 protocol_iteration: self.protocol_iteration,
911 protocol_run_offset: self.protocol_run_offset,
912 termination: &self.termination,
913 }
914 }
915
916 fn next_id(&mut self) -> EffectId {
917 let id = EffectId(self.next_effect_id);
918 self.next_effect_id += 1;
919 id
920 }
921
922 fn next_synthetic_message_id(&mut self, scope: &str) -> String {
923 let id = format!(
924 "m_sansio_{}_{}_{}",
925 self.protocol_run_offset, scope, self.next_synthetic_message_id
926 );
927 self.next_synthetic_message_id += 1;
928 id
929 }
930
931 fn emit(&mut self, event: SessionEvent) {
932 self.pending_effects.push_back(Effect::Emit(event));
933 }
934
935 fn emit_progress(&mut self) {
936 let event_delta = self.next_event_delta();
937 self.pending_effects.push_back(Effect::Progress {
938 messages: self.messages.clone(),
939 event_delta,
940 protocol_iteration: self.protocol_iteration,
941 });
942 }
943
944 pub fn fail_turn(&mut self, event: SessionEvent) {
945 self.emit(event);
946 self.finish(TurnOutcome::Stopped(TurnStop::RuntimeError));
947 }
948
949 pub fn finish_with_outcome(&mut self, outcome: TurnOutcome) {
950 self.finish(outcome);
951 }
952
953 fn finish(&mut self, outcome: TurnOutcome) {
954 self.emit(SessionEvent::TurnOutcome { outcome });
955 self.emit(SessionEvent::Done);
956 let msgs = std::mem::take(&mut self.messages);
957 let event_delta = self.next_event_delta();
958 let protocol_iteration = self.protocol_iteration;
959 self.state = MachineState::Finished;
960 self.pending_effects.push_back(Effect::Done {
961 messages: msgs,
962 event_delta,
963 protocol_iteration,
964 });
965 }
966
967 fn next_event_delta(&mut self) -> Vec<SessionEventRecord<M::Event>> {
968 if self.progress_event_cursor >= self.events.len() {
969 self.progress_event_cursor = self.events.len();
970 return Vec::new();
971 }
972 let delta = self.events[self.progress_event_cursor..].to_vec();
973 self.progress_event_cursor = self.events.len();
974 delta
975 }
976
977 pub fn poll_effect(&mut self) -> Option<Effect<M>> {
980 if let Some(effect) = self.pending_effects.pop_front() {
981 return Some(effect);
982 }
983 if self.active_effect_redelivery {
984 self.active_effect_redelivery = false;
985 if let Some(effect) = self.state.outstanding_effect() {
986 return Some(effect);
987 }
988 }
989
990 match &self.state {
991 MachineState::PreparingProtocol => {
992 self.prepare_protocol();
993 self.pending_effects.pop_front()
994 }
995 MachineState::PrepareIteration => {
996 self.prepare_protocol_iteration();
997 self.pending_effects.pop_front()
998 }
999 _ => None,
1000 }
1001 }
1002
1003 fn prepare_protocol(&mut self) {
1006 if self.config.sync_execution_surface {
1007 let id = self.next_id();
1008 self.state = MachineState::WaitingExecutionSurface {
1009 effect_id: id,
1010 update_machine_config: false,
1011 };
1012 self.pending_effects
1013 .push_back(Effect::SyncExecutionSurface {
1014 id,
1015 update_machine_config: false,
1016 });
1017 return;
1018 }
1019
1020 self.prepare_protocol_iteration();
1021 }
1022
1023 fn prepare_protocol_iteration(&mut self) {
1024 if self.config.sync_execution_surface
1025 && self.synced_protocol_iteration != Some(self.protocol_iteration)
1026 {
1027 let id = self.next_id();
1028 self.state = MachineState::WaitingExecutionSurface {
1029 effect_id: id,
1030 update_machine_config: true,
1031 };
1032 self.pending_effects
1033 .push_back(Effect::SyncExecutionSurface {
1034 id,
1035 update_machine_config: true,
1036 });
1037 return;
1038 }
1039 let actions = {
1040 let driver = Arc::clone(&self.config.protocol_driver);
1041 let ctx = self.driver_context();
1042 driver.prepare_protocol_iteration(ctx)
1043 };
1044 self.apply_actions(actions);
1045 }
1046
1047 fn start_llm_request(
1048 &mut self,
1049 request: Arc<LlmRequest>,
1050 driver_state: Option<M::DriverState>,
1051 ) {
1052 let tool_list = self
1053 .config
1054 .tool_specs
1055 .iter()
1056 .map(|tool| tool.name.as_str())
1057 .collect::<Vec<_>>()
1058 .join(", ");
1059 self.emit(SessionEvent::LlmRequest {
1060 protocol_iteration: self.protocol_iteration,
1061 message_count: self.messages.len(),
1062 tool_list,
1063 });
1064
1065 let id = self.next_id();
1066 self.state = MachineState::WaitingLlm {
1067 effect_id: id,
1068 request: Arc::clone(&request),
1069 driver_state,
1070 };
1071 self.pending_effects
1072 .push_back(Effect::LlmCall { id, request });
1073 }
1074
1075 fn start_tool_calls(&mut self, calls: Vec<PendingToolCall>) {
1076 let effect_id = self.next_id();
1077 self.state = MachineState::WaitingTools {
1078 effect_id,
1079 calls: calls.clone(),
1080 };
1081 self.pending_effects.push_back(Effect::ToolCalls {
1082 id: effect_id,
1083 calls,
1084 });
1085 }
1086
1087 fn start_exec(&mut self, code: String, driver_state: M::DriverState) {
1088 let effect_id = self.next_id();
1089 self.state = MachineState::WaitingExec {
1090 effect_id,
1091 code: code.clone(),
1092 driver_state,
1093 };
1094 self.pending_effects.push_back(Effect::ExecCode {
1095 id: effect_id,
1096 code,
1097 });
1098 }
1099
1100 fn schedule_turn_limit_final(&mut self, message: Message) -> bool {
1101 let Some(_max_turns) = self.termination.turn_limit_final_to_schedule(
1102 self.protocol_iteration,
1103 self.protocol_run_offset,
1104 self.config.max_turns,
1105 ) else {
1106 return false;
1107 };
1108 self.termination.mark_turn_limit_final_scheduled();
1109 self.messages.push(message);
1110 true
1111 }
1112
1113 fn schedule_configured_turn_limit_final(&mut self) -> bool {
1114 let Some(max_turns) = self.termination.turn_limit_final_to_schedule(
1115 self.protocol_iteration,
1116 self.protocol_run_offset,
1117 self.config.max_turns,
1118 ) else {
1119 return false;
1120 };
1121 let message_id = self.next_synthetic_message_id("turn_limit");
1122 let message = (self.config.turn_limit_final_message)(message_id, max_turns);
1123 self.termination.mark_turn_limit_final_scheduled();
1124 self.messages.push(message);
1125 true
1126 }
1127
1128 fn append_event(&mut self, event: SessionEventRecord<M::Event>) {
1129 match event {
1130 SessionEventRecord::Conversation(record) => {
1131 Arc::make_mut(&mut self.events)
1132 .push(SessionEventRecord::Conversation(record.clone()));
1133 self.messages.push(record.to_message());
1134 }
1135 SessionEventRecord::Tool(ToolEvent::Invocation { stable_key, record }) => {
1136 Arc::make_mut(&mut self.events).push(SessionEventRecord::Tool(
1137 ToolEvent::Invocation { stable_key, record },
1138 ));
1139 }
1140 SessionEventRecord::Protocol(protocol_event) => {
1141 Arc::make_mut(&mut self.events).push(SessionEventRecord::Protocol(protocol_event));
1142 }
1143 }
1144 }
1145
1146 pub fn apply_actions(&mut self, actions: Vec<DriverAction<M>>) {
1147 let mut progress_dirty = false;
1148 for action in actions {
1149 match action {
1150 DriverAction::Emit(event) => self.emit(event),
1151 DriverAction::AppendEvents(events) => {
1152 if !events.is_empty() {
1153 for event in events {
1154 self.append_event(event);
1155 }
1156 progress_dirty = true;
1157 }
1158 }
1159 DriverAction::StartLlm {
1160 request,
1161 driver_state,
1162 } => self.start_llm_request(request, driver_state),
1163 DriverAction::StartTools { calls } => self.start_tool_calls(calls),
1164 DriverAction::StartExec { code, driver_state } => {
1165 self.start_exec(code, driver_state)
1166 }
1167 DriverAction::StartCheckpoint {
1168 checkpoint,
1169 on_empty,
1170 } => self.request_checkpoint(checkpoint, on_empty),
1171 DriverAction::AdvanceProtocolIteration => {
1172 self.protocol_iteration += 1;
1173 self.synced_protocol_iteration = None;
1174 progress_dirty = true;
1175 }
1176 DriverAction::ScheduleTurnLimitFinal { message } => {
1177 if self.schedule_turn_limit_final(message) {
1178 progress_dirty = true;
1179 }
1180 }
1181 DriverAction::Finish(outcome) => {
1182 if progress_dirty {
1183 self.emit_progress();
1184 progress_dirty = false;
1185 }
1186 self.finish(outcome);
1187 break;
1188 }
1189 }
1190 }
1191 if progress_dirty {
1192 self.emit_progress();
1193 }
1194 }
1195
1196 pub fn handle_response(&mut self, response: Response) {
1198 self.active_effect_redelivery = false;
1199 match response {
1200 Response::ExecutionSurfaceSynced { id, result } => {
1201 self.handle_execution_surface_synced(id, result)
1202 }
1203 Response::LlmComplete {
1204 id,
1205 result,
1206 text_streamed,
1207 } => self.handle_llm_complete(id, result, text_streamed),
1208 Response::ToolResults { id, results } => self.handle_tool_results(id, results),
1209 Response::ExecResult { id, result } => self.handle_exec_result(id, result),
1210 Response::Checkpoint { id, delivery } => self.handle_checkpoint(id, delivery),
1211 }
1212 }
1213
1214 fn request_checkpoint(&mut self, checkpoint: CheckpointKind, on_empty: CheckpointResumeAction) {
1215 let id = self.next_id();
1216 self.state = MachineState::WaitingCheckpoint {
1217 effect_id: id,
1218 checkpoint,
1219 on_empty,
1220 };
1221 self.pending_effects
1222 .push_back(Effect::Checkpoint { id, checkpoint });
1223 }
1224
1225 fn handle_execution_surface_synced(
1226 &mut self,
1227 id: EffectId,
1228 result: Result<Option<ExecutionSurfaceSync>, String>,
1229 ) {
1230 let (waiting_id, waiting_update_machine_config) =
1231 match std::mem::replace(&mut self.state, MachineState::Finished) {
1232 MachineState::WaitingExecutionSurface {
1233 effect_id,
1234 update_machine_config,
1235 } => (effect_id, update_machine_config),
1236 other => {
1237 self.state = other;
1238 return;
1239 }
1240 };
1241 if waiting_id != id {
1242 self.state = MachineState::WaitingExecutionSurface {
1243 effect_id: waiting_id,
1244 update_machine_config: waiting_update_machine_config,
1245 };
1246 return;
1247 }
1248
1249 match result {
1250 Ok(update) => {
1251 if let Some(update) = update {
1252 self.config.system_prompt = update.system_prompt;
1253 self.config.tool_specs = update.tool_specs;
1254 }
1255 self.synced_protocol_iteration = Some(self.protocol_iteration);
1256 self.state = MachineState::PrepareIteration;
1257 }
1258 Err(error) => {
1259 self.fail_turn(make_error_event(
1260 "execution_surface",
1261 Some("reconfigure_failed"),
1262 format!("Failed to refresh execution surface: {error}"),
1263 Some(error),
1264 ));
1265 }
1266 }
1267 }
1268
1269 fn append_checkpoint_messages(&mut self, plugin_messages: &[PluginMessage], transient: bool) {
1270 let mut appended = Vec::new();
1271 for message in plugin_messages
1272 .iter()
1273 .filter(|message| matches!(message.role, MessageRole::User | MessageRole::System))
1274 {
1275 let message_id = self.next_synthetic_message_id("checkpoint");
1276 let mut parts = if message.parts.is_empty() {
1277 vec![Part {
1278 id: format!("{message_id}.p0"),
1279 kind: PartKind::Text,
1280 content: message.content.clone(),
1281 attachment: None,
1282 tool_call_id: None,
1283 tool_name: None,
1284 tool_replay: None,
1285 prune_state: PruneState::Intact,
1286 reasoning_meta: None,
1287 response_meta: None,
1288 }]
1289 } else {
1290 message.parts.clone()
1291 };
1292 reassign_part_ids(&message_id, &mut parts);
1293 appended.push(Message {
1294 id: message_id.clone(),
1295 role: message.role,
1296 parts: Arc::new(parts),
1297 origin: message.origin.clone().or_else(|| {
1298 Some(MessageOrigin::Plugin {
1299 plugin_id: "plugin".to_string(),
1300 transient,
1301 })
1302 }),
1303 });
1304 }
1305 if !appended.is_empty() {
1306 self.messages.extend(appended);
1307 }
1308 }
1309
1310 fn append_turn_causes(&mut self, causes: Vec<TurnCause>) {
1311 if causes.is_empty() {
1312 return;
1313 }
1314 let mut existing_ids = self
1315 .turn_causes
1316 .iter()
1317 .map(|cause| cause.id.clone())
1318 .collect::<HashSet<_>>();
1319 for cause in causes {
1320 if !existing_ids.insert(cause.id.clone()) {
1321 continue;
1322 }
1323 self.messages.push(cause.to_event_message());
1324 self.turn_causes.push(cause);
1325 }
1326 }
1327
1328 fn handle_checkpoint(&mut self, id: EffectId, delivery: CheckpointDelivery) {
1329 let (effect_id, checkpoint, on_empty) =
1330 match std::mem::replace(&mut self.state, MachineState::Finished) {
1331 MachineState::WaitingCheckpoint {
1332 effect_id,
1333 checkpoint,
1334 on_empty,
1335 } => (effect_id, checkpoint, on_empty),
1336 other => {
1337 self.state = other;
1338 return;
1339 }
1340 };
1341 if effect_id != id {
1342 self.state = MachineState::WaitingCheckpoint {
1343 effect_id,
1344 checkpoint,
1345 on_empty,
1346 };
1347 return;
1348 }
1349
1350 if !delivery.messages.is_empty()
1351 || !delivery.transient_messages.is_empty()
1352 || !delivery.turn_causes.is_empty()
1353 {
1354 self.append_checkpoint_messages(&delivery.messages, false);
1355 self.append_checkpoint_messages(&delivery.transient_messages, true);
1356 self.append_turn_causes(delivery.turn_causes);
1357 if matches!(checkpoint, CheckpointKind::BeforeCompletion) {
1358 self.protocol_iteration += 1;
1359 if self.termination.should_force_exit_after_grace_turn() {
1360 self.emit_progress();
1361 self.finish(TurnOutcome::Stopped(TurnStop::MaxTurns));
1362 return;
1363 }
1364 self.schedule_configured_turn_limit_final();
1365 }
1366 self.state = MachineState::PrepareIteration;
1367 self.emit_progress();
1368 return;
1369 }
1370
1371 match on_empty {
1372 CheckpointResumeAction::PrepareIteration => {
1373 self.state = MachineState::PrepareIteration;
1374 }
1375 CheckpointResumeAction::Finish(outcome) => self.finish(outcome),
1376 }
1377 }
1378
1379 fn take_waiting_llm_state(&mut self, id: EffectId) -> Option<WaitingLlmState<M>> {
1380 match std::mem::replace(&mut self.state, MachineState::Finished) {
1381 MachineState::WaitingLlm {
1382 effect_id,
1383 request,
1384 driver_state,
1385 } if effect_id == id => Some(WaitingLlmState {
1386 request,
1387 driver_state,
1388 }),
1389 other => {
1390 self.state = other;
1391 None
1392 }
1393 }
1394 }
1395
1396 fn handle_llm_complete(
1397 &mut self,
1398 id: EffectId,
1399 result: Result<LlmResponse, LlmCallError>,
1400 text_streamed: bool,
1401 ) {
1402 let Some(waiting) = self.take_waiting_llm_state(id) else {
1403 return;
1404 };
1405 match result {
1406 Err(error) => {
1407 self.emit_llm_error(error);
1408 }
1409 Ok(mut llm_response) => {
1410 refine_terminal_reason_for_context_window(
1414 &mut llm_response,
1415 self.config.max_context_tokens,
1416 );
1417 self.record_llm_usage(&llm_response, self.llm_response_text(&llm_response));
1418 if self.handle_terminal_llm_response(&llm_response, text_streamed) {
1419 return;
1420 }
1421 let actions = {
1422 let driver = Arc::clone(&self.config.protocol_driver);
1423 let ctx = self.driver_context();
1424 driver.handle_llm_success(ctx, waiting, llm_response, text_streamed)
1425 };
1426 self.apply_actions(actions);
1427 }
1428 }
1429 }
1430
1431 fn handle_terminal_llm_response(
1432 &mut self,
1433 llm_response: &LlmResponse,
1434 text_streamed: bool,
1435 ) -> bool {
1436 let outcome = match llm_response.terminal_reason {
1437 LlmTerminalReason::OutputLimit => TurnOutcome::Stopped(TurnStop::Incomplete),
1438 LlmTerminalReason::ContextOverflow => TurnOutcome::Stopped(TurnStop::ProviderError),
1439 LlmTerminalReason::ContentFilter => TurnOutcome::Stopped(TurnStop::ProviderError),
1440 LlmTerminalReason::ProviderError => TurnOutcome::Stopped(TurnStop::ProviderError),
1441 LlmTerminalReason::Cancelled => TurnOutcome::Stopped(TurnStop::Cancelled),
1442 LlmTerminalReason::Stop | LlmTerminalReason::ToolUse | LlmTerminalReason::Unknown => {
1443 return false;
1444 }
1445 };
1446
1447 if !text_streamed && !llm_response.full_text.is_empty() {
1448 self.emit(SessionEvent::TextDelta {
1449 content: llm_response.full_text.clone(),
1450 });
1451 }
1452 self.emit(SessionEvent::LlmResponse {
1453 protocol_iteration: self.protocol_iteration,
1454 content: llm_response.full_text.clone(),
1455 duration_ms: 0,
1456 });
1457 let reason = llm_response.terminal_reason;
1458 let diagnostic = llm_response
1459 .terminal_diagnostic
1460 .clone()
1461 .unwrap_or_else(|| format!("Model call ended with terminal reason {reason:?}."));
1462 self.emit(SessionEvent::Error {
1463 message: diagnostic.clone(),
1464 envelope: Some(crate::session_model::make_error_envelope(
1465 "llm_provider",
1466 Some(reason.code()),
1467 Some(reason),
1468 diagnostic,
1469 None,
1470 )),
1471 });
1472 self.finish(outcome);
1473 true
1474 }
1475
1476 fn llm_response_text<'a>(&self, llm_response: &'a LlmResponse) -> &'a str {
1477 &llm_response.full_text
1478 }
1479
1480 fn llm_response_debug_parts(&self, llm_response: &LlmResponse) -> Option<Value> {
1481 let parts = llm_response
1482 .parts
1483 .iter()
1484 .filter_map(|part| match part {
1485 LlmOutputPart::Text { text, .. } if !text.is_empty() => Some(serde_json::json!({
1486 "type": "text",
1487 "text": text,
1488 })),
1489 LlmOutputPart::Text { .. } => None,
1490 LlmOutputPart::Reasoning {
1491 text,
1492 replay,
1493 } => Some(serde_json::json!({
1494 "type": "reasoning",
1495 "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1496 "summary": replay.as_ref().map(|meta| &meta.summary),
1497 "text": text,
1498 "has_encrypted": replay.as_ref().is_some_and(|meta| meta.encrypted_content.is_some() || meta.signature.is_some()),
1499 "redacted": replay.as_ref().is_some_and(|meta| meta.redacted),
1500 })),
1501 LlmOutputPart::ToolCall {
1502 call_id,
1503 tool_name,
1504 input_json,
1505 replay,
1506 } => Some(serde_json::json!({
1507 "type": "tool_call",
1508 "call_id": call_id,
1509 "tool_name": tool_name,
1510 "input_json": input_json,
1511 "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1512 "has_opaque": replay.as_ref().is_some_and(|meta| meta.opaque.is_some()),
1513 })),
1514 })
1515 .collect::<Vec<_>>();
1516 (!parts.is_empty()).then_some(Value::Array(parts))
1517 }
1518
1519 fn record_llm_usage(&mut self, llm_response: &LlmResponse, response_text: &str) {
1520 let usage = token_usage_from_llm_usage(&llm_response.usage);
1521 self.cumulative_usage.add(&usage);
1522 self.emit(SessionEvent::TokenUsage {
1523 protocol_iteration: self.protocol_iteration,
1524 usage: usage.clone(),
1525 cumulative: self.cumulative_usage.clone(),
1526 });
1527 if self.config.emit_llm_trace {
1528 let response_parts = self.llm_response_debug_parts(llm_response);
1529 self.pending_effects.push_back(Effect::Log {
1530 event: LogEvent::LlmDebug {
1531 session_id: self.config.session_id.clone(),
1532 protocol_iteration: self.protocol_iteration,
1533 usage,
1534 provider_usage: llm_response.provider_usage.clone(),
1535 request_body: llm_response.request_body.clone(),
1536 response_text: response_text.to_string(),
1537 response_parts,
1538 },
1539 });
1540 }
1541 }
1542
1543 fn record_llm_error(&mut self, error: &LlmCallError) {
1544 if self.config.emit_llm_trace {
1545 self.pending_effects.push_back(Effect::Log {
1546 event: LogEvent::LlmError {
1547 session_id: self.config.session_id.clone(),
1548 protocol_iteration: self.protocol_iteration,
1549 request_body: error.request_body.clone(),
1550 message: error.message.clone(),
1551 retryable: error.retryable,
1552 raw: error.raw.clone(),
1553 code: error.code.clone(),
1554 terminal_reason: error.terminal_reason,
1555 },
1556 });
1557 }
1558 }
1559
1560 fn emit_llm_error(&mut self, error: LlmCallError) {
1561 self.record_llm_error(&error);
1562 self.emit(SessionEvent::Error {
1563 message: format!("LLM error: {}", error.message),
1564 envelope: Some(crate::session_model::make_error_envelope(
1565 "llm_provider",
1566 error.code.as_deref(),
1567 Some(error.terminal_reason),
1568 format!("LLM error: {}", error.message),
1569 error.raw,
1570 )),
1571 });
1572 self.finish(TurnOutcome::Stopped(TurnStop::ProviderError));
1573 }
1574
1575 fn handle_tool_results(&mut self, id: EffectId, completed: Vec<CompletedToolCall>) {
1576 let (waiting_effect_id, waiting_calls) =
1577 match std::mem::replace(&mut self.state, MachineState::Finished) {
1578 MachineState::WaitingTools { effect_id, calls } => (effect_id, calls),
1579 other => {
1580 self.state = other;
1581 return;
1582 }
1583 };
1584
1585 if waiting_effect_id != id {
1586 self.state = MachineState::WaitingTools {
1587 effect_id: waiting_effect_id,
1588 calls: waiting_calls,
1589 };
1590 return;
1591 }
1592
1593 for outcome in &completed {
1594 self.emit(SessionEvent::ToolCall {
1595 call_id: Some(outcome.call_id.clone()),
1596 name: outcome.tool_name.clone(),
1597 args: outcome.args.clone(),
1598 output: outcome.output.clone(),
1599 duration_ms: outcome.duration_ms,
1600 });
1601 }
1602
1603 let actions = {
1604 let driver = Arc::clone(&self.config.protocol_driver);
1605 let ctx = self.driver_context();
1606 driver.handle_tool_results(ctx, completed)
1607 };
1608 self.apply_actions(actions);
1609 }
1610
1611 fn take_waiting_exec_state(&mut self, id: EffectId) -> Option<WaitingExecState<M>> {
1612 match std::mem::replace(&mut self.state, MachineState::Finished) {
1613 MachineState::WaitingExec {
1614 effect_id,
1615 code: _,
1616 driver_state,
1617 } if effect_id == id => Some(WaitingExecState { driver_state }),
1618 other => {
1619 self.state = other;
1620 None
1621 }
1622 }
1623 }
1624
1625 fn handle_exec_result(&mut self, id: EffectId, result: Result<crate::ExecResponse, String>) {
1626 let Some(waiting) = self.take_waiting_exec_state(id) else {
1627 return;
1628 };
1629 let actions = {
1630 let driver = Arc::clone(&self.config.protocol_driver);
1631 let ctx = self.driver_context();
1632 driver.handle_exec_result(ctx, waiting, result)
1633 };
1634 self.apply_actions(actions);
1635 }
1636}
1637
1638fn token_usage_from_llm_usage(usage: &crate::llm::types::LlmUsage) -> TokenUsage {
1639 TokenUsage {
1640 input_tokens: usage.input_tokens,
1641 output_tokens: usage.output_tokens,
1642 cached_input_tokens: usage.cached_input_tokens,
1643 reasoning_tokens: usage.reasoning_tokens,
1644 }
1645}
1646
1647fn refine_terminal_reason_for_context_window(
1655 response: &mut LlmResponse,
1656 max_context_tokens: Option<usize>,
1657) {
1658 if response.terminal_reason != LlmTerminalReason::OutputLimit {
1659 return;
1660 }
1661 if response.usage.output_tokens != 0 {
1662 return;
1663 }
1664 let Some(max_context_tokens) = max_context_tokens.filter(|value| *value > 0) else {
1665 return;
1666 };
1667 let prompt_tokens = response
1668 .usage
1669 .input_tokens
1670 .saturating_add(response.usage.cached_input_tokens)
1671 .max(0) as usize;
1672 if prompt_tokens >= max_context_tokens.saturating_mul(95) / 100 {
1673 response.terminal_reason = LlmTerminalReason::ContextOverflow;
1674 response.terminal_diagnostic = Some(
1675 "Model produced no output because the prompt reached the configured context window."
1676 .to_string(),
1677 );
1678 }
1679}
1680
1681#[cfg(test)]
1682mod tests;