1use std::collections::{HashSet, VecDeque};
8use std::fmt::Debug;
9use std::sync::Arc;
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use serde_json::Value;
14
15use crate::llm::types::{
16 LlmAttachment, LlmOutputPart, LlmRequest, LlmResponse, LlmTerminalReason, LlmToolChoice,
17 LlmToolSpec, ProviderReplayMeta,
18};
19use crate::session_model::message::MessageOrigin;
20use crate::session_model::{
21 Message, MessageRole, MessageSequence, Part, PartKind, PruneState, SessionEvent,
22 SessionEventRecord, TokenUsage, TurnTerminationPolicyState, make_error_event,
23 reassign_part_ids, render_prompt,
24};
25use crate::{
26 CheckpointKind, ModelToolReturn, PluginMessage, ToolCallOutput, TurnOutcome, TurnStop,
27};
28
29pub trait TurnProtocol: Send + Sync + 'static {
32 type Event: Clone + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
33 type Termination: Clone + Default + Debug + Send + Sync + 'static;
34 type DriverState: Clone + Default + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
35}
36
37#[derive(Clone, Debug, Serialize, serde::Deserialize)]
38pub struct UnitTurnProtocol;
39
40impl TurnProtocol for UnitTurnProtocol {
41 type Event = ();
42 type Termination = ();
43 type DriverState = serde_json::Value;
44}
45
46#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
48pub struct EffectId(pub u64);
49
50#[derive(Clone, Debug, Serialize, serde::Deserialize)]
51pub struct PendingToolCall {
52 pub call_id: String,
53 pub tool_name: String,
54 pub args: Value,
55 pub replay: Option<ProviderReplayMeta>,
57}
58
59#[derive(Clone, Debug, Serialize, serde::Deserialize)]
60pub struct CompletedToolCall {
61 pub call_id: String,
62 pub tool_name: String,
63 pub args: Value,
64 pub output: ToolCallOutput,
65 pub model_return: ModelToolReturn,
66 pub duration_ms: u64,
67 pub replay: Option<ProviderReplayMeta>,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
72pub struct TurnCause {
73 pub id: String,
74 pub event_type: String,
75 pub origin: MessageOrigin,
76 pub text: String,
77}
78
79impl TurnCause {
80 pub fn to_event_message(&self) -> Message {
81 Message {
82 id: self.id.clone(),
83 role: MessageRole::Event,
84 parts: Arc::new(vec![Part {
85 id: format!("{}.p0", self.id),
86 kind: PartKind::Text,
87 content: self.text.clone(),
88 attachment: None,
89 tool_call_id: None,
90 tool_name: None,
91 tool_replay: None,
92 prune_state: PruneState::Intact,
93 reasoning_meta: None,
94 response_meta: None,
95 }]),
96 origin: Some(self.origin.clone()),
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
102pub struct CheckpointDelivery {
103 pub messages: Vec<PluginMessage>,
104 pub transient_messages: Vec<PluginMessage>,
105 pub turn_causes: Vec<TurnCause>,
106}
107
108pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
109 if causes.is_empty() {
110 return None;
111 }
112
113 let mut rendered = String::from("=== TURN EVENTS ===");
114 for (index, cause) in causes.iter().enumerate() {
115 rendered.push_str("\n\n");
116 rendered.push_str(&format!(
117 "--- event[{index}] · {} · {} ---\n",
118 cause.event_type, cause.id
119 ));
120 rendered.push_str("Origin: ");
121 rendered.push_str(&render_message_origin(&cause.origin));
122 rendered.push_str("\n\n");
123 rendered.push_str(cause.text.trim());
124 }
125 Some(rendered)
126}
127
128fn render_message_origin(origin: &MessageOrigin) -> String {
129 match origin {
130 MessageOrigin::Plugin {
131 plugin_id,
132 transient,
133 } => {
134 if *transient {
135 format!("plugin {plugin_id} (transient)")
136 } else {
137 format!("plugin {plugin_id}")
138 }
139 }
140 MessageOrigin::Process {
141 process_id,
142 event_type,
143 sequence,
144 wake_id,
145 ..
146 } => match wake_id {
147 Some(wake_id) => {
148 format!("process {process_id} {event_type} #{sequence} ({wake_id})")
149 }
150 None => format!("process {process_id} {event_type} #{sequence}"),
151 },
152 }
153}
154
155#[derive(Clone, Debug, Serialize, serde::Deserialize)]
156pub enum LogEvent {
157 LlmDebug {
158 session_id: String,
159 protocol_iteration: usize,
160 usage: TokenUsage,
161 provider_usage: Option<Value>,
162 request_body: Option<String>,
163 response_text: String,
164 response_parts: Option<Value>,
165 },
166 LlmError {
167 session_id: String,
168 protocol_iteration: usize,
169 request_body: Option<String>,
170 message: String,
171 retryable: bool,
172 raw: Option<String>,
173 code: Option<String>,
174 terminal_reason: LlmTerminalReason,
175 },
176}
177
178#[derive(Debug, Serialize, serde::Deserialize)]
185#[allow(clippy::large_enum_variant)]
186pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
187 SyncExecutionSurface {
195 id: EffectId,
196 update_machine_config: bool,
197 },
198 LlmCall {
200 id: EffectId,
201 request: Arc<LlmRequest>,
202 },
203 CancelLlm { id: EffectId },
205 ToolCalls {
207 id: EffectId,
208 calls: Vec<PendingToolCall>,
209 },
210 ExecCode { id: EffectId, code: String },
212 Checkpoint {
214 id: EffectId,
215 checkpoint: CheckpointKind,
216 },
217 Log { event: LogEvent },
219 Emit(SessionEvent),
221 Progress {
227 messages: MessageSequence,
228 event_delta: Vec<SessionEventRecord<M::Event>>,
229 protocol_iteration: usize,
230 },
231 Done {
233 messages: MessageSequence,
234 event_delta: Vec<SessionEventRecord<M::Event>>,
235 protocol_iteration: usize,
236 },
237}
238
239impl<M: TurnProtocol> Clone for Effect<M> {
240 fn clone(&self) -> Self {
241 match self {
242 Self::SyncExecutionSurface {
243 id,
244 update_machine_config,
245 } => Self::SyncExecutionSurface {
246 id: *id,
247 update_machine_config: *update_machine_config,
248 },
249 Self::LlmCall { id, request } => Self::LlmCall {
250 id: *id,
251 request: Arc::clone(request),
252 },
253 Self::CancelLlm { id } => Self::CancelLlm { id: *id },
254 Self::ToolCalls { id, calls } => Self::ToolCalls {
255 id: *id,
256 calls: calls.clone(),
257 },
258 Self::ExecCode { id, code } => Self::ExecCode {
259 id: *id,
260 code: code.clone(),
261 },
262 Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
263 id: *id,
264 checkpoint: *checkpoint,
265 },
266 Self::Log { event } => Self::Log {
267 event: event.clone(),
268 },
269 Self::Emit(event) => Self::Emit(event.clone()),
270 Self::Progress {
271 messages,
272 event_delta,
273 protocol_iteration,
274 } => Self::Progress {
275 messages: messages.clone(),
276 event_delta: event_delta.clone(),
277 protocol_iteration: *protocol_iteration,
278 },
279 Self::Done {
280 messages,
281 event_delta,
282 protocol_iteration,
283 } => Self::Done {
284 messages: messages.clone(),
285 event_delta: event_delta.clone(),
286 protocol_iteration: *protocol_iteration,
287 },
288 }
289 }
290}
291
292impl<M: TurnProtocol> Effect<M> {
293 fn id(&self) -> Option<EffectId> {
294 match self {
295 Self::SyncExecutionSurface { id, .. }
296 | Self::LlmCall { id, .. }
297 | Self::CancelLlm { id }
298 | Self::ToolCalls { id, .. }
299 | Self::ExecCode { id, .. }
300 | Self::Checkpoint { id, .. } => Some(*id),
301 Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
302 }
303 }
304}
305
306#[derive(Clone, Debug, Serialize, serde::Deserialize)]
308pub struct LlmCallError {
309 pub message: String,
310 pub retryable: bool,
311 pub raw: Option<String>,
312 pub code: Option<String>,
313 pub terminal_reason: LlmTerminalReason,
314 pub request_body: Option<String>,
315}
316
317pub enum Response {
319 ExecutionSurfaceSynced {
321 id: EffectId,
322 result: Result<Option<ExecutionSurfaceSync>, String>,
323 },
324 LlmComplete {
326 id: EffectId,
327 result: Result<LlmResponse, LlmCallError>,
328 text_streamed: bool,
331 },
332 ToolResults {
334 id: EffectId,
335 results: Vec<CompletedToolCall>,
336 },
337 ExecResult {
339 id: EffectId,
340 result: Result<crate::ExecResponse, String>,
341 },
342 Checkpoint {
344 id: EffectId,
345 delivery: CheckpointDelivery,
346 },
347}
348
349#[derive(Clone, Debug, Serialize, serde::Deserialize)]
350pub struct ExecutionSurfaceSync {
351 pub system_prompt: Arc<str>,
352 pub tool_specs: Arc<Vec<LlmToolSpec>>,
353}
354
355pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
356 pub request: Arc<LlmRequest>,
357 driver_state: Option<M::DriverState>,
358}
359
360impl<M: TurnProtocol> WaitingLlmState<M> {
361 pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
362 self.driver_state.take()
363 }
364}
365
366pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
367 driver_state: M::DriverState,
368}
369
370impl<M: TurnProtocol> WaitingExecState<M> {
371 pub fn into_driver_state(self) -> M::DriverState {
372 self.driver_state
373 }
374}
375
376#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
377pub enum CheckpointResumeAction {
378 PrepareIteration,
379 Finish(TurnOutcome),
380}
381
382#[allow(clippy::large_enum_variant)]
383pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
384 Emit(SessionEvent),
385 AppendEvents(Vec<SessionEventRecord<M::Event>>),
386 StartLlm {
387 request: Arc<LlmRequest>,
388 driver_state: Option<M::DriverState>,
389 },
390 StartTools {
391 calls: Vec<PendingToolCall>,
392 },
393 StartExec {
394 code: String,
395 driver_state: M::DriverState,
396 },
397 StartCheckpoint {
398 checkpoint: CheckpointKind,
399 on_empty: CheckpointResumeAction,
400 },
401 AdvanceProtocolIteration,
402 ScheduleTurnLimitFinal {
403 message: Message,
404 },
405 Finish(TurnOutcome),
406}
407
408pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
409 config: &'a TurnMachineConfig<M>,
410 messages: &'a MessageSequence,
411 events: &'a [SessionEventRecord<M::Event>],
412 turn_causes: &'a [TurnCause],
413 protocol_iteration: usize,
414 protocol_run_offset: usize,
415 termination: &'a TurnTerminationPolicyState,
416}
417
418impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
419 pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
420 self.config.projector.project(ProjectorContext {
421 config: self.config,
422 messages: self.messages,
423 events: self.events,
424 turn_causes: self.turn_causes,
425 protocol_iteration: self.protocol_iteration,
426 use_tools,
427 })
428 }
429
430 pub fn protocol_iteration(&self) -> usize {
431 self.protocol_iteration
432 }
433
434 pub fn protocol_run_offset(&self) -> usize {
435 self.protocol_run_offset
436 }
437
438 pub fn max_turns(&self) -> Option<usize> {
439 self.config.max_turns
440 }
441
442 pub fn termination(&self) -> &M::Termination {
443 &self.config.termination
444 }
445
446 pub fn autonomous(&self) -> bool {
447 self.config.autonomous
448 }
449
450 pub fn should_force_exit_after_grace_turn(&self) -> bool {
451 self.termination.should_force_exit_after_grace_turn()
452 }
453
454 pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
455 self.termination.turn_limit_final_to_schedule(
456 self.protocol_iteration,
457 self.protocol_run_offset,
458 self.config.max_turns,
459 )
460 }
461
462 pub fn messages(&self) -> &MessageSequence {
463 self.messages
464 }
465
466 pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
467 self.events
468 }
469
470 pub fn turn_causes(&self) -> &[TurnCause] {
471 self.turn_causes
472 }
473}
474
475pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
476 pub config: &'a TurnMachineConfig<M>,
477 pub messages: &'a MessageSequence,
478 pub events: &'a [SessionEventRecord<M::Event>],
479 pub turn_causes: &'a [TurnCause],
480 pub protocol_iteration: usize,
481 pub use_tools: bool,
482}
483
484pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
485 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
486}
487
488#[derive(Clone, Debug, Default)]
489pub struct ChatContextProjector;
490
491impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
492 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
493 let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
494 let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
495 let mut messages = rendered_prompt.messages;
496 if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
497 messages.push(crate::llm::types::LlmMessage::text(
498 crate::llm::types::LlmRole::User,
499 Arc::from(turn_events),
500 ));
501 }
502 if !ctx.config.system_prompt.trim().is_empty() {
503 messages.insert(
504 0,
505 crate::llm::types::LlmMessage::text(
506 crate::llm::types::LlmRole::System,
507 Arc::clone(&ctx.config.system_prompt),
508 ),
509 );
510 }
511
512 Arc::new(LlmRequest {
513 model: ctx.config.model.clone(),
514 messages,
515 attachments,
516 tools: if ctx.use_tools {
517 Arc::clone(&ctx.config.tool_specs)
518 } else {
519 Arc::new(Vec::new())
520 },
521 tool_choice: if ctx.use_tools {
522 LlmToolChoice::Auto
523 } else {
524 LlmToolChoice::None
525 },
526 model_variant: ctx.config.model_variant.clone(),
527 generation: ctx.config.generation.clone(),
528 session_id: ctx.config.run_session_id.clone(),
529 output_spec: None,
530 stream_events: None,
531 provider_trace: None,
532 })
533 }
534}
535
536fn render_messages_for_projector(
537 messages: &MessageSequence,
538 turn_causes: &[TurnCause],
539) -> crate::RenderedPrompt {
540 if turn_causes.is_empty() {
541 return messages.render_prompt();
542 }
543
544 let active_cause_ids = turn_causes
545 .iter()
546 .map(|cause| cause.id.as_str())
547 .collect::<HashSet<_>>();
548 let filtered = messages
549 .iter()
550 .filter(|message| {
551 !(matches!(message.role, MessageRole::Event)
552 && active_cause_ids.contains(message.id.as_str()))
553 })
554 .cloned()
555 .collect::<Vec<_>>();
556 render_prompt(filtered.as_slice())
557}
558
559pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
560 fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
561 fn handle_llm_success(
562 &self,
563 ctx: DriverContextView<'_, M>,
564 waiting: WaitingLlmState<M>,
565 llm_response: LlmResponse,
566 text_streamed: bool,
567 ) -> Vec<DriverAction<M>>;
568 fn handle_tool_results(
569 &self,
570 ctx: DriverContextView<'_, M>,
571 completed: Vec<CompletedToolCall>,
572 ) -> Vec<DriverAction<M>>;
573 fn handle_exec_result(
574 &self,
575 ctx: DriverContextView<'_, M>,
576 waiting: WaitingExecState<M>,
577 result: Result<crate::ExecResponse, String>,
578 ) -> Vec<DriverAction<M>>;
579}
580
581pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
583 pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
584 pub projector: Arc<dyn ContextProjector<M>>,
585 pub sync_execution_surface: bool,
586 pub model: String,
587 pub max_context_tokens: Option<usize>,
592 pub max_turns: Option<usize>,
593 pub model_variant: Option<String>,
594 pub generation: crate::llm::types::GenerationOptions,
595 pub run_session_id: Option<String>,
596 pub autonomous: bool,
597 pub tool_specs: Arc<Vec<LlmToolSpec>>,
598 pub system_prompt: Arc<str>,
599 pub session_id: String,
600 pub emit_llm_trace: bool,
601 pub termination: M::Termination,
602 pub turn_limit_final_message: crate::TurnLimitFinalMessage,
603}
604
605#[derive(Debug, Serialize, serde::Deserialize)]
608enum MachineState<M: TurnProtocol = UnitTurnProtocol> {
609 PreparingProtocol,
610 WaitingExecutionSurface {
611 effect_id: EffectId,
612 update_machine_config: bool,
613 },
614 PrepareIteration,
615 WaitingLlm {
616 effect_id: EffectId,
617 request: Arc<LlmRequest>,
618 driver_state: Option<M::DriverState>,
619 },
620 WaitingTools {
621 effect_id: EffectId,
622 calls: Vec<PendingToolCall>,
623 },
624 WaitingExec {
625 effect_id: EffectId,
626 code: String,
627 driver_state: M::DriverState,
628 },
629 WaitingCheckpoint {
630 effect_id: EffectId,
631 checkpoint: CheckpointKind,
632 on_empty: CheckpointResumeAction,
633 },
634 Finished,
635}
636
637#[derive(Clone, Debug, Serialize, serde::Deserialize)]
638pub struct TurnCheckpoint<M: TurnProtocol = UnitTurnProtocol> {
639 state: MachineState<M>,
640 pending_effects: Vec<Effect<M>>,
641 next_effect_id: u64,
642 #[serde(default)]
643 next_synthetic_message_id: u64,
644 messages: Vec<Message>,
645 events: Vec<SessionEventRecord<M::Event>>,
646 #[serde(default)]
647 turn_causes: Vec<TurnCause>,
648 #[serde(default)]
649 progress_event_cursor: usize,
650 protocol_iteration: usize,
651 protocol_run_offset: usize,
652 cumulative_usage: TokenUsage,
653 termination: TurnTerminationPolicyState,
654 synced_protocol_iteration: Option<usize>,
655}
656
657impl<M: TurnProtocol> Clone for MachineState<M> {
658 fn clone(&self) -> Self {
659 match self {
660 Self::PreparingProtocol => Self::PreparingProtocol,
661 Self::WaitingExecutionSurface {
662 effect_id,
663 update_machine_config,
664 } => Self::WaitingExecutionSurface {
665 effect_id: *effect_id,
666 update_machine_config: *update_machine_config,
667 },
668 Self::PrepareIteration => Self::PrepareIteration,
669 Self::WaitingLlm {
670 effect_id,
671 request,
672 driver_state,
673 } => Self::WaitingLlm {
674 effect_id: *effect_id,
675 request: Arc::clone(request),
676 driver_state: driver_state.clone(),
677 },
678 Self::WaitingTools { effect_id, calls } => Self::WaitingTools {
679 effect_id: *effect_id,
680 calls: calls.clone(),
681 },
682 Self::WaitingExec {
683 effect_id,
684 code,
685 driver_state,
686 } => Self::WaitingExec {
687 effect_id: *effect_id,
688 code: code.clone(),
689 driver_state: driver_state.clone(),
690 },
691 Self::WaitingCheckpoint {
692 effect_id,
693 checkpoint,
694 on_empty,
695 } => Self::WaitingCheckpoint {
696 effect_id: *effect_id,
697 checkpoint: *checkpoint,
698 on_empty: on_empty.clone(),
699 },
700 Self::Finished => Self::Finished,
701 }
702 }
703}
704
705impl<M: TurnProtocol> MachineState<M> {
706 fn outstanding_effect_id(&self) -> Option<EffectId> {
707 match self {
708 Self::WaitingExecutionSurface { effect_id, .. }
709 | Self::WaitingLlm { effect_id, .. }
710 | Self::WaitingTools { effect_id, .. }
711 | Self::WaitingExec { effect_id, .. }
712 | Self::WaitingCheckpoint { effect_id, .. } => Some(*effect_id),
713 Self::PreparingProtocol | Self::PrepareIteration | Self::Finished => None,
714 }
715 }
716
717 fn outstanding_effect(&self) -> Option<Effect<M>> {
718 match self {
719 Self::WaitingExecutionSurface {
720 effect_id,
721 update_machine_config,
722 } => Some(Effect::SyncExecutionSurface {
723 id: *effect_id,
724 update_machine_config: *update_machine_config,
725 }),
726 Self::WaitingLlm {
727 effect_id, request, ..
728 } => Some(Effect::LlmCall {
729 id: *effect_id,
730 request: Arc::clone(request),
731 }),
732 Self::WaitingTools { effect_id, calls } => Some(Effect::ToolCalls {
733 id: *effect_id,
734 calls: calls.clone(),
735 }),
736 Self::WaitingExec {
737 effect_id, code, ..
738 } => Some(Effect::ExecCode {
739 id: *effect_id,
740 code: code.clone(),
741 }),
742 Self::WaitingCheckpoint {
743 effect_id,
744 checkpoint,
745 ..
746 } => Some(Effect::Checkpoint {
747 id: *effect_id,
748 checkpoint: *checkpoint,
749 }),
750 Self::PreparingProtocol | Self::PrepareIteration | Self::Finished => None,
751 }
752 }
753}
754
755pub struct TurnMachine<M: TurnProtocol = UnitTurnProtocol> {
757 config: TurnMachineConfig<M>,
758 state: MachineState<M>,
759 pending_effects: VecDeque<Effect<M>>,
760 active_effect_redelivery: bool,
761 next_effect_id: u64,
762 next_synthetic_message_id: u64,
763 messages: MessageSequence,
764 events: Arc<Vec<SessionEventRecord<M::Event>>>,
765 turn_causes: Vec<TurnCause>,
766 progress_event_cursor: usize,
767 protocol_iteration: usize,
768 protocol_run_offset: usize,
769 cumulative_usage: TokenUsage,
770 termination: TurnTerminationPolicyState,
771 synced_protocol_iteration: Option<usize>,
772}
773
774impl<M: TurnProtocol> TurnMachine<M> {
775 pub fn new(
777 config: TurnMachineConfig<M>,
778 messages: Vec<Message>,
779 events: Arc<Vec<SessionEventRecord<M::Event>>>,
780 protocol_run_offset: usize,
781 ) -> Self {
782 Self::new_shared(
783 config,
784 MessageSequence::from_owned(messages),
785 events,
786 protocol_run_offset,
787 )
788 }
789
790 pub fn new_shared(
791 config: TurnMachineConfig<M>,
792 messages: MessageSequence,
793 events: Arc<Vec<SessionEventRecord<M::Event>>>,
794 protocol_run_offset: usize,
795 ) -> Self {
796 Self::new_shared_with_turn_causes(config, messages, events, protocol_run_offset, Vec::new())
797 }
798
799 pub fn new_shared_with_turn_causes(
800 config: TurnMachineConfig<M>,
801 messages: MessageSequence,
802 events: Arc<Vec<SessionEventRecord<M::Event>>>,
803 protocol_run_offset: usize,
804 turn_causes: Vec<TurnCause>,
805 ) -> Self {
806 let next_synthetic_message_id = messages.len() as u64;
807 Self {
808 config,
809 state: MachineState::PreparingProtocol,
810 pending_effects: VecDeque::new(),
811 active_effect_redelivery: false,
812 next_effect_id: 1,
813 next_synthetic_message_id,
814 messages,
815 progress_event_cursor: events.len(),
816 events,
817 turn_causes,
818 protocol_iteration: protocol_run_offset,
819 protocol_run_offset,
820 cumulative_usage: TokenUsage::default(),
821 termination: TurnTerminationPolicyState::new(),
822 synced_protocol_iteration: None,
823 }
824 }
825
826 pub fn is_done(&self) -> bool {
828 matches!(self.state, MachineState::Finished)
829 }
830
831 pub fn messages(&self) -> Arc<Vec<Message>> {
832 self.messages.shared()
833 }
834
835 pub fn events(&self) -> Arc<Vec<SessionEventRecord<M::Event>>> {
836 Arc::clone(&self.events)
837 }
838
839 pub fn message_sequence(&self) -> MessageSequence {
840 self.messages.clone()
841 }
842
843 pub fn protocol_iteration(&self) -> usize {
844 self.protocol_iteration
845 }
846
847 pub fn checkpoint(&self) -> TurnCheckpoint<M> {
848 let active_effect_id = self.state.outstanding_effect_id();
849 let pending_effects = self
850 .pending_effects
851 .iter()
852 .filter(|effect| active_effect_id.is_none_or(|id| effect.id() != Some(id)))
853 .cloned()
854 .collect::<Vec<_>>();
855 TurnCheckpoint {
856 state: self.state.clone(),
857 pending_effects,
858 next_effect_id: self.next_effect_id,
859 next_synthetic_message_id: self.next_synthetic_message_id,
860 messages: self.messages.iter().cloned().collect(),
861 events: self.events.as_ref().clone(),
862 turn_causes: self.turn_causes.clone(),
863 progress_event_cursor: self.progress_event_cursor,
864 protocol_iteration: self.protocol_iteration,
865 protocol_run_offset: self.protocol_run_offset,
866 cumulative_usage: self.cumulative_usage.clone(),
867 termination: self.termination.clone(),
868 synced_protocol_iteration: self.synced_protocol_iteration,
869 }
870 }
871
872 pub fn restore_from_checkpoint(
873 config: TurnMachineConfig<M>,
874 checkpoint: TurnCheckpoint<M>,
875 ) -> Self {
876 let active_effect_id = checkpoint.state.outstanding_effect_id();
877 let pending_effects = checkpoint
878 .pending_effects
879 .into_iter()
880 .collect::<VecDeque<_>>();
881 let active_effect_redelivery = active_effect_id.is_some()
882 && !pending_effects
883 .iter()
884 .any(|effect| effect.id() == active_effect_id);
885 Self {
886 config,
887 state: checkpoint.state,
888 pending_effects,
889 active_effect_redelivery,
890 next_effect_id: checkpoint.next_effect_id,
891 next_synthetic_message_id: checkpoint.next_synthetic_message_id,
892 messages: MessageSequence::from_owned(checkpoint.messages),
893 events: Arc::new(checkpoint.events),
894 turn_causes: checkpoint.turn_causes,
895 progress_event_cursor: checkpoint.progress_event_cursor,
896 protocol_iteration: checkpoint.protocol_iteration,
897 protocol_run_offset: checkpoint.protocol_run_offset,
898 cumulative_usage: checkpoint.cumulative_usage,
899 termination: checkpoint.termination,
900 synced_protocol_iteration: checkpoint.synced_protocol_iteration,
901 }
902 }
903
904 fn driver_context(&self) -> DriverContextView<'_, M> {
905 DriverContextView {
906 config: &self.config,
907 messages: &self.messages,
908 events: self.events.as_slice(),
909 turn_causes: &self.turn_causes,
910 protocol_iteration: self.protocol_iteration,
911 protocol_run_offset: self.protocol_run_offset,
912 termination: &self.termination,
913 }
914 }
915
916 fn next_id(&mut self) -> EffectId {
917 let id = EffectId(self.next_effect_id);
918 self.next_effect_id += 1;
919 id
920 }
921
922 fn next_synthetic_message_id(&mut self, scope: &str) -> String {
923 let id = format!(
924 "m_sansio_{}_{}_{}",
925 self.protocol_run_offset, scope, self.next_synthetic_message_id
926 );
927 self.next_synthetic_message_id += 1;
928 id
929 }
930
931 fn emit(&mut self, event: SessionEvent) {
932 self.pending_effects.push_back(Effect::Emit(event));
933 }
934
935 fn emit_progress(&mut self) {
936 let event_delta = self.next_event_delta();
937 self.pending_effects.push_back(Effect::Progress {
938 messages: self.messages.clone(),
939 event_delta,
940 protocol_iteration: self.protocol_iteration,
941 });
942 }
943
944 pub fn fail_turn(&mut self, event: SessionEvent) {
945 self.emit(event);
946 self.finish(TurnOutcome::Stopped(TurnStop::RuntimeError));
947 }
948
949 pub fn finish_with_outcome(&mut self, outcome: TurnOutcome) {
950 self.finish(outcome);
951 }
952
953 fn finish(&mut self, outcome: TurnOutcome) {
954 self.emit(SessionEvent::TurnOutcome { outcome });
955 self.emit(SessionEvent::Done);
956 let msgs = std::mem::take(&mut self.messages);
957 let event_delta = self.next_event_delta();
958 let protocol_iteration = self.protocol_iteration;
959 self.state = MachineState::Finished;
960 self.pending_effects.push_back(Effect::Done {
961 messages: msgs,
962 event_delta,
963 protocol_iteration,
964 });
965 }
966
967 fn next_event_delta(&mut self) -> Vec<SessionEventRecord<M::Event>> {
968 if self.progress_event_cursor >= self.events.len() {
969 self.progress_event_cursor = self.events.len();
970 return Vec::new();
971 }
972 let delta = self.events[self.progress_event_cursor..].to_vec();
973 self.progress_event_cursor = self.events.len();
974 delta
975 }
976
977 pub fn poll_effect(&mut self) -> Option<Effect<M>> {
980 if let Some(effect) = self.pending_effects.pop_front() {
981 return Some(effect);
982 }
983 if self.active_effect_redelivery {
984 self.active_effect_redelivery = false;
985 if let Some(effect) = self.state.outstanding_effect() {
986 return Some(effect);
987 }
988 }
989
990 match &self.state {
991 MachineState::PreparingProtocol => {
992 self.prepare_protocol();
993 self.pending_effects.pop_front()
994 }
995 MachineState::PrepareIteration => {
996 self.prepare_protocol_iteration();
997 self.pending_effects.pop_front()
998 }
999 _ => None,
1000 }
1001 }
1002
1003 fn prepare_protocol(&mut self) {
1006 if self.config.sync_execution_surface {
1007 let id = self.next_id();
1008 self.state = MachineState::WaitingExecutionSurface {
1009 effect_id: id,
1010 update_machine_config: false,
1011 };
1012 self.pending_effects
1013 .push_back(Effect::SyncExecutionSurface {
1014 id,
1015 update_machine_config: false,
1016 });
1017 return;
1018 }
1019
1020 self.prepare_protocol_iteration();
1021 }
1022
1023 fn prepare_protocol_iteration(&mut self) {
1024 if self.config.sync_execution_surface
1025 && self.synced_protocol_iteration != Some(self.protocol_iteration)
1026 {
1027 let id = self.next_id();
1028 self.state = MachineState::WaitingExecutionSurface {
1029 effect_id: id,
1030 update_machine_config: true,
1031 };
1032 self.pending_effects
1033 .push_back(Effect::SyncExecutionSurface {
1034 id,
1035 update_machine_config: true,
1036 });
1037 return;
1038 }
1039 let actions = {
1040 let driver = Arc::clone(&self.config.protocol_driver);
1041 let ctx = self.driver_context();
1042 driver.prepare_protocol_iteration(ctx)
1043 };
1044 self.apply_actions(actions);
1045 }
1046
1047 fn start_llm_request(
1048 &mut self,
1049 request: Arc<LlmRequest>,
1050 driver_state: Option<M::DriverState>,
1051 ) {
1052 let tool_list = self
1053 .config
1054 .tool_specs
1055 .iter()
1056 .map(|tool| tool.name.as_str())
1057 .collect::<Vec<_>>()
1058 .join(", ");
1059 self.emit(SessionEvent::LlmRequest {
1060 protocol_iteration: self.protocol_iteration,
1061 message_count: self.messages.len(),
1062 tool_list,
1063 });
1064
1065 let id = self.next_id();
1066 self.state = MachineState::WaitingLlm {
1067 effect_id: id,
1068 request: Arc::clone(&request),
1069 driver_state,
1070 };
1071 self.pending_effects
1072 .push_back(Effect::LlmCall { id, request });
1073 }
1074
1075 fn start_tool_calls(&mut self, calls: Vec<PendingToolCall>) {
1076 let effect_id = self.next_id();
1077 self.state = MachineState::WaitingTools {
1078 effect_id,
1079 calls: calls.clone(),
1080 };
1081 self.pending_effects.push_back(Effect::ToolCalls {
1082 id: effect_id,
1083 calls,
1084 });
1085 }
1086
1087 fn start_exec(&mut self, code: String, driver_state: M::DriverState) {
1088 let effect_id = self.next_id();
1089 self.state = MachineState::WaitingExec {
1090 effect_id,
1091 code: code.clone(),
1092 driver_state,
1093 };
1094 self.pending_effects.push_back(Effect::ExecCode {
1095 id: effect_id,
1096 code,
1097 });
1098 }
1099
1100 fn schedule_turn_limit_final(&mut self, message: Message) -> bool {
1101 let Some(_max_turns) = self.termination.turn_limit_final_to_schedule(
1102 self.protocol_iteration,
1103 self.protocol_run_offset,
1104 self.config.max_turns,
1105 ) else {
1106 return false;
1107 };
1108 self.termination.mark_turn_limit_final_scheduled();
1109 self.messages.push(message);
1110 true
1111 }
1112
1113 fn schedule_configured_turn_limit_final(&mut self) -> bool {
1114 let Some(max_turns) = self.termination.turn_limit_final_to_schedule(
1115 self.protocol_iteration,
1116 self.protocol_run_offset,
1117 self.config.max_turns,
1118 ) else {
1119 return false;
1120 };
1121 let message_id = self.next_synthetic_message_id("turn_limit");
1122 let message = (self.config.turn_limit_final_message)(message_id, max_turns);
1123 self.termination.mark_turn_limit_final_scheduled();
1124 self.messages.push(message);
1125 true
1126 }
1127
1128 fn append_event(&mut self, event: SessionEventRecord<M::Event>) {
1129 match event {
1130 SessionEventRecord::Conversation(record) => {
1131 Arc::make_mut(&mut self.events)
1132 .push(SessionEventRecord::Conversation(record.clone()));
1133 self.messages.push(record.to_message());
1134 }
1135 SessionEventRecord::Protocol(protocol_event) => {
1136 Arc::make_mut(&mut self.events).push(SessionEventRecord::Protocol(protocol_event));
1137 }
1138 }
1139 }
1140
1141 pub fn apply_actions(&mut self, actions: Vec<DriverAction<M>>) {
1142 let mut progress_dirty = false;
1143 for action in actions {
1144 match action {
1145 DriverAction::Emit(event) => self.emit(event),
1146 DriverAction::AppendEvents(events) => {
1147 if !events.is_empty() {
1148 for event in events {
1149 self.append_event(event);
1150 }
1151 progress_dirty = true;
1152 }
1153 }
1154 DriverAction::StartLlm {
1155 request,
1156 driver_state,
1157 } => self.start_llm_request(request, driver_state),
1158 DriverAction::StartTools { calls } => self.start_tool_calls(calls),
1159 DriverAction::StartExec { code, driver_state } => {
1160 self.start_exec(code, driver_state)
1161 }
1162 DriverAction::StartCheckpoint {
1163 checkpoint,
1164 on_empty,
1165 } => self.request_checkpoint(checkpoint, on_empty),
1166 DriverAction::AdvanceProtocolIteration => {
1167 self.protocol_iteration += 1;
1168 self.synced_protocol_iteration = None;
1169 progress_dirty = true;
1170 }
1171 DriverAction::ScheduleTurnLimitFinal { message } => {
1172 if self.schedule_turn_limit_final(message) {
1173 progress_dirty = true;
1174 }
1175 }
1176 DriverAction::Finish(outcome) => {
1177 if progress_dirty {
1178 self.emit_progress();
1179 progress_dirty = false;
1180 }
1181 self.finish(outcome);
1182 break;
1183 }
1184 }
1185 }
1186 if progress_dirty {
1187 self.emit_progress();
1188 }
1189 }
1190
1191 pub fn handle_response(&mut self, response: Response) {
1193 self.active_effect_redelivery = false;
1194 match response {
1195 Response::ExecutionSurfaceSynced { id, result } => {
1196 self.handle_execution_surface_synced(id, result)
1197 }
1198 Response::LlmComplete {
1199 id,
1200 result,
1201 text_streamed,
1202 } => self.handle_llm_complete(id, result, text_streamed),
1203 Response::ToolResults { id, results } => self.handle_tool_results(id, results),
1204 Response::ExecResult { id, result } => self.handle_exec_result(id, result),
1205 Response::Checkpoint { id, delivery } => self.handle_checkpoint(id, delivery),
1206 }
1207 }
1208
1209 fn request_checkpoint(&mut self, checkpoint: CheckpointKind, on_empty: CheckpointResumeAction) {
1210 let id = self.next_id();
1211 self.state = MachineState::WaitingCheckpoint {
1212 effect_id: id,
1213 checkpoint,
1214 on_empty,
1215 };
1216 self.pending_effects
1217 .push_back(Effect::Checkpoint { id, checkpoint });
1218 }
1219
1220 fn handle_execution_surface_synced(
1221 &mut self,
1222 id: EffectId,
1223 result: Result<Option<ExecutionSurfaceSync>, String>,
1224 ) {
1225 let (waiting_id, waiting_update_machine_config) =
1226 match std::mem::replace(&mut self.state, MachineState::Finished) {
1227 MachineState::WaitingExecutionSurface {
1228 effect_id,
1229 update_machine_config,
1230 } => (effect_id, update_machine_config),
1231 other => {
1232 self.state = other;
1233 return;
1234 }
1235 };
1236 if waiting_id != id {
1237 self.state = MachineState::WaitingExecutionSurface {
1238 effect_id: waiting_id,
1239 update_machine_config: waiting_update_machine_config,
1240 };
1241 return;
1242 }
1243
1244 match result {
1245 Ok(update) => {
1246 if let Some(update) = update {
1247 self.config.system_prompt = update.system_prompt;
1248 self.config.tool_specs = update.tool_specs;
1249 }
1250 self.synced_protocol_iteration = Some(self.protocol_iteration);
1251 self.state = MachineState::PrepareIteration;
1252 }
1253 Err(error) => {
1254 self.fail_turn(make_error_event(
1255 "execution_surface",
1256 Some("reconfigure_failed"),
1257 format!("Failed to refresh execution surface: {error}"),
1258 Some(error),
1259 ));
1260 }
1261 }
1262 }
1263
1264 fn append_checkpoint_messages(&mut self, plugin_messages: &[PluginMessage], transient: bool) {
1265 let mut appended = Vec::new();
1266 for message in plugin_messages
1267 .iter()
1268 .filter(|message| matches!(message.role, MessageRole::User | MessageRole::System))
1269 {
1270 let message_id = self.next_synthetic_message_id("checkpoint");
1271 let mut parts = if message.parts.is_empty() {
1272 vec![Part {
1273 id: format!("{message_id}.p0"),
1274 kind: PartKind::Text,
1275 content: message.content.clone(),
1276 attachment: None,
1277 tool_call_id: None,
1278 tool_name: None,
1279 tool_replay: None,
1280 prune_state: PruneState::Intact,
1281 reasoning_meta: None,
1282 response_meta: None,
1283 }]
1284 } else {
1285 message.parts.clone()
1286 };
1287 reassign_part_ids(&message_id, &mut parts);
1288 appended.push(Message {
1289 id: message_id.clone(),
1290 role: message.role,
1291 parts: Arc::new(parts),
1292 origin: message.origin.clone().or_else(|| {
1293 Some(MessageOrigin::Plugin {
1294 plugin_id: "plugin".to_string(),
1295 transient,
1296 })
1297 }),
1298 });
1299 }
1300 if !appended.is_empty() {
1301 self.messages.extend(appended);
1302 }
1303 }
1304
1305 fn append_turn_causes(&mut self, causes: Vec<TurnCause>) {
1306 if causes.is_empty() {
1307 return;
1308 }
1309 let mut existing_ids = self
1310 .turn_causes
1311 .iter()
1312 .map(|cause| cause.id.clone())
1313 .collect::<HashSet<_>>();
1314 for cause in causes {
1315 if !existing_ids.insert(cause.id.clone()) {
1316 continue;
1317 }
1318 self.messages.push(cause.to_event_message());
1319 self.turn_causes.push(cause);
1320 }
1321 }
1322
1323 fn handle_checkpoint(&mut self, id: EffectId, delivery: CheckpointDelivery) {
1324 let (effect_id, checkpoint, on_empty) =
1325 match std::mem::replace(&mut self.state, MachineState::Finished) {
1326 MachineState::WaitingCheckpoint {
1327 effect_id,
1328 checkpoint,
1329 on_empty,
1330 } => (effect_id, checkpoint, on_empty),
1331 other => {
1332 self.state = other;
1333 return;
1334 }
1335 };
1336 if effect_id != id {
1337 self.state = MachineState::WaitingCheckpoint {
1338 effect_id,
1339 checkpoint,
1340 on_empty,
1341 };
1342 return;
1343 }
1344
1345 if !delivery.messages.is_empty()
1346 || !delivery.transient_messages.is_empty()
1347 || !delivery.turn_causes.is_empty()
1348 {
1349 self.append_checkpoint_messages(&delivery.messages, false);
1350 self.append_checkpoint_messages(&delivery.transient_messages, true);
1351 self.append_turn_causes(delivery.turn_causes);
1352 if matches!(checkpoint, CheckpointKind::BeforeCompletion) {
1353 self.protocol_iteration += 1;
1354 if self.termination.should_force_exit_after_grace_turn() {
1355 self.emit_progress();
1356 self.finish(TurnOutcome::Stopped(TurnStop::MaxTurns));
1357 return;
1358 }
1359 self.schedule_configured_turn_limit_final();
1360 }
1361 self.state = MachineState::PrepareIteration;
1362 self.emit_progress();
1363 return;
1364 }
1365
1366 match on_empty {
1367 CheckpointResumeAction::PrepareIteration => {
1368 self.state = MachineState::PrepareIteration;
1369 }
1370 CheckpointResumeAction::Finish(outcome) => self.finish(outcome),
1371 }
1372 }
1373
1374 fn take_waiting_llm_state(&mut self, id: EffectId) -> Option<WaitingLlmState<M>> {
1375 match std::mem::replace(&mut self.state, MachineState::Finished) {
1376 MachineState::WaitingLlm {
1377 effect_id,
1378 request,
1379 driver_state,
1380 } if effect_id == id => Some(WaitingLlmState {
1381 request,
1382 driver_state,
1383 }),
1384 other => {
1385 self.state = other;
1386 None
1387 }
1388 }
1389 }
1390
1391 fn handle_llm_complete(
1392 &mut self,
1393 id: EffectId,
1394 result: Result<LlmResponse, LlmCallError>,
1395 text_streamed: bool,
1396 ) {
1397 let Some(waiting) = self.take_waiting_llm_state(id) else {
1398 return;
1399 };
1400 match result {
1401 Err(error) => {
1402 self.emit_llm_error(error);
1403 }
1404 Ok(mut llm_response) => {
1405 refine_terminal_reason_for_context_window(
1409 &mut llm_response,
1410 self.config.max_context_tokens,
1411 );
1412 self.record_llm_usage(&llm_response, self.llm_response_text(&llm_response));
1413 if self.handle_terminal_llm_response(&llm_response, text_streamed) {
1414 return;
1415 }
1416 let actions = {
1417 let driver = Arc::clone(&self.config.protocol_driver);
1418 let ctx = self.driver_context();
1419 driver.handle_llm_success(ctx, waiting, llm_response, text_streamed)
1420 };
1421 self.apply_actions(actions);
1422 }
1423 }
1424 }
1425
1426 fn handle_terminal_llm_response(
1427 &mut self,
1428 llm_response: &LlmResponse,
1429 text_streamed: bool,
1430 ) -> bool {
1431 let outcome = match llm_response.terminal_reason {
1432 LlmTerminalReason::OutputLimit => TurnOutcome::Stopped(TurnStop::Incomplete),
1433 LlmTerminalReason::ContextOverflow => TurnOutcome::Stopped(TurnStop::ProviderError),
1434 LlmTerminalReason::ContentFilter => TurnOutcome::Stopped(TurnStop::ProviderError),
1435 LlmTerminalReason::ProviderError => TurnOutcome::Stopped(TurnStop::ProviderError),
1436 LlmTerminalReason::Cancelled => TurnOutcome::Stopped(TurnStop::Cancelled),
1437 LlmTerminalReason::Stop | LlmTerminalReason::ToolUse | LlmTerminalReason::Unknown => {
1438 return false;
1439 }
1440 };
1441
1442 if !text_streamed && !llm_response.full_text.is_empty() {
1443 self.emit(SessionEvent::TextDelta {
1444 content: llm_response.full_text.clone(),
1445 });
1446 }
1447 self.emit(SessionEvent::LlmResponse {
1448 protocol_iteration: self.protocol_iteration,
1449 content: llm_response.full_text.clone(),
1450 duration_ms: 0,
1451 });
1452 let reason = llm_response.terminal_reason;
1453 let diagnostic = llm_response
1454 .terminal_diagnostic
1455 .clone()
1456 .unwrap_or_else(|| format!("Model call ended with terminal reason {reason:?}."));
1457 self.emit(SessionEvent::Error {
1458 message: diagnostic.clone(),
1459 envelope: Some(crate::session_model::make_error_envelope(
1460 "llm_provider",
1461 Some(reason.code()),
1462 Some(reason),
1463 diagnostic,
1464 None,
1465 )),
1466 });
1467 self.finish(outcome);
1468 true
1469 }
1470
1471 fn llm_response_text<'a>(&self, llm_response: &'a LlmResponse) -> &'a str {
1472 &llm_response.full_text
1473 }
1474
1475 fn llm_response_debug_parts(&self, llm_response: &LlmResponse) -> Option<Value> {
1476 let parts = llm_response
1477 .parts
1478 .iter()
1479 .filter_map(|part| match part {
1480 LlmOutputPart::Text { text, .. } if !text.is_empty() => Some(serde_json::json!({
1481 "type": "text",
1482 "text": text,
1483 })),
1484 LlmOutputPart::Text { .. } => None,
1485 LlmOutputPart::Reasoning {
1486 text,
1487 replay,
1488 } => Some(serde_json::json!({
1489 "type": "reasoning",
1490 "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1491 "summary": replay.as_ref().map(|meta| &meta.summary),
1492 "text": text,
1493 "has_encrypted": replay.as_ref().is_some_and(|meta| meta.encrypted_content.is_some() || meta.signature.is_some()),
1494 "redacted": replay.as_ref().is_some_and(|meta| meta.redacted),
1495 })),
1496 LlmOutputPart::ToolCall {
1497 call_id,
1498 tool_name,
1499 input_json,
1500 replay,
1501 } => Some(serde_json::json!({
1502 "type": "tool_call",
1503 "call_id": call_id,
1504 "tool_name": tool_name,
1505 "input_json": input_json,
1506 "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1507 "has_opaque": replay.as_ref().is_some_and(|meta| meta.opaque.is_some()),
1508 })),
1509 })
1510 .collect::<Vec<_>>();
1511 (!parts.is_empty()).then_some(Value::Array(parts))
1512 }
1513
1514 fn record_llm_usage(&mut self, llm_response: &LlmResponse, response_text: &str) {
1515 let usage = token_usage_from_llm_usage(&llm_response.usage);
1516 self.cumulative_usage.add(&usage);
1517 self.emit(SessionEvent::TokenUsage {
1518 protocol_iteration: self.protocol_iteration,
1519 usage: usage.clone(),
1520 cumulative: self.cumulative_usage.clone(),
1521 });
1522 if self.config.emit_llm_trace {
1523 let response_parts = self.llm_response_debug_parts(llm_response);
1524 self.pending_effects.push_back(Effect::Log {
1525 event: LogEvent::LlmDebug {
1526 session_id: self.config.session_id.clone(),
1527 protocol_iteration: self.protocol_iteration,
1528 usage,
1529 provider_usage: llm_response.provider_usage.clone(),
1530 request_body: llm_response.request_body.clone(),
1531 response_text: response_text.to_string(),
1532 response_parts,
1533 },
1534 });
1535 }
1536 }
1537
1538 fn record_llm_error(&mut self, error: &LlmCallError) {
1539 if self.config.emit_llm_trace {
1540 self.pending_effects.push_back(Effect::Log {
1541 event: LogEvent::LlmError {
1542 session_id: self.config.session_id.clone(),
1543 protocol_iteration: self.protocol_iteration,
1544 request_body: error.request_body.clone(),
1545 message: error.message.clone(),
1546 retryable: error.retryable,
1547 raw: error.raw.clone(),
1548 code: error.code.clone(),
1549 terminal_reason: error.terminal_reason,
1550 },
1551 });
1552 }
1553 }
1554
1555 fn emit_llm_error(&mut self, error: LlmCallError) {
1556 self.record_llm_error(&error);
1557 self.emit(SessionEvent::Error {
1558 message: format!("LLM error: {}", error.message),
1559 envelope: Some(crate::session_model::make_error_envelope(
1560 "llm_provider",
1561 error.code.as_deref(),
1562 Some(error.terminal_reason),
1563 format!("LLM error: {}", error.message),
1564 error.raw,
1565 )),
1566 });
1567 self.finish(TurnOutcome::Stopped(TurnStop::ProviderError));
1568 }
1569
1570 fn handle_tool_results(&mut self, id: EffectId, completed: Vec<CompletedToolCall>) {
1571 let (waiting_effect_id, waiting_calls) =
1572 match std::mem::replace(&mut self.state, MachineState::Finished) {
1573 MachineState::WaitingTools { effect_id, calls } => (effect_id, calls),
1574 other => {
1575 self.state = other;
1576 return;
1577 }
1578 };
1579
1580 if waiting_effect_id != id {
1581 self.state = MachineState::WaitingTools {
1582 effect_id: waiting_effect_id,
1583 calls: waiting_calls,
1584 };
1585 return;
1586 }
1587
1588 for outcome in &completed {
1589 self.emit(SessionEvent::ToolCall {
1590 call_id: Some(outcome.call_id.clone()),
1591 name: outcome.tool_name.clone(),
1592 args: outcome.args.clone(),
1593 output: outcome.output.clone(),
1594 duration_ms: outcome.duration_ms,
1595 });
1596 }
1597
1598 let actions = {
1599 let driver = Arc::clone(&self.config.protocol_driver);
1600 let ctx = self.driver_context();
1601 driver.handle_tool_results(ctx, completed)
1602 };
1603 self.apply_actions(actions);
1604 }
1605
1606 fn take_waiting_exec_state(&mut self, id: EffectId) -> Option<WaitingExecState<M>> {
1607 match std::mem::replace(&mut self.state, MachineState::Finished) {
1608 MachineState::WaitingExec {
1609 effect_id,
1610 code: _,
1611 driver_state,
1612 } if effect_id == id => Some(WaitingExecState { driver_state }),
1613 other => {
1614 self.state = other;
1615 None
1616 }
1617 }
1618 }
1619
1620 fn handle_exec_result(&mut self, id: EffectId, result: Result<crate::ExecResponse, String>) {
1621 let Some(waiting) = self.take_waiting_exec_state(id) else {
1622 return;
1623 };
1624 let actions = {
1625 let driver = Arc::clone(&self.config.protocol_driver);
1626 let ctx = self.driver_context();
1627 driver.handle_exec_result(ctx, waiting, result)
1628 };
1629 self.apply_actions(actions);
1630 }
1631}
1632
1633fn token_usage_from_llm_usage(usage: &crate::llm::types::LlmUsage) -> TokenUsage {
1634 TokenUsage {
1635 input_tokens: usage.input_tokens,
1636 output_tokens: usage.output_tokens,
1637 cached_input_tokens: usage.cached_input_tokens,
1638 reasoning_tokens: usage.reasoning_tokens,
1639 }
1640}
1641
1642fn refine_terminal_reason_for_context_window(
1650 response: &mut LlmResponse,
1651 max_context_tokens: Option<usize>,
1652) {
1653 if response.terminal_reason != LlmTerminalReason::OutputLimit {
1654 return;
1655 }
1656 if response.usage.output_tokens != 0 {
1657 return;
1658 }
1659 let Some(max_context_tokens) = max_context_tokens.filter(|value| *value > 0) else {
1660 return;
1661 };
1662 let prompt_tokens = response
1663 .usage
1664 .input_tokens
1665 .saturating_add(response.usage.cached_input_tokens)
1666 .max(0) as usize;
1667 if prompt_tokens >= max_context_tokens.saturating_mul(95) / 100 {
1668 response.terminal_reason = LlmTerminalReason::ContextOverflow;
1669 response.terminal_diagnostic = Some(
1670 "Model produced no output because the prompt reached the configured context window."
1671 .to_string(),
1672 );
1673 }
1674}
1675
1676#[cfg(test)]
1677mod tests;