Skip to main content

lash_sansio/sansio/
mod.rs

1//! Sans-IO state machine for session turns.
2//!
3//! `TurnMachine` owns the generic effect engine. Protocol-specific behavior
4//! lives behind `ProtocolDriverHandle`, which returns declarative
5//! `DriverAction`s that the machine applies.
6
7use std::collections::{HashSet, VecDeque};
8use std::fmt::Debug;
9use std::sync::Arc;
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use serde_json::Value;
14
15use crate::llm::types::{
16    LlmAttachment, LlmOutputPart, LlmRequest, LlmResponse, LlmTerminalReason, LlmToolChoice,
17    LlmToolSpec, ProviderReplayMeta,
18};
19use crate::session_model::message::MessageOrigin;
20use crate::session_model::{
21    Message, MessageRole, MessageSequence, Part, PartKind, PruneState, SessionEvent,
22    SessionEventRecord, TokenUsage, TurnTerminationPolicyState, make_error_event,
23    reassign_part_ids, render_prompt,
24};
25use crate::{
26    CheckpointKind, ModelToolReturn, PluginMessage, ToolCallOutput, TurnOutcome, TurnStop,
27};
28
29// ─── Public types ───
30
31pub trait TurnProtocol: Send + Sync + 'static {
32    type Event: Clone + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
33    type Termination: Clone + Default + Debug + Send + Sync + 'static;
34    type DriverState: Clone + Default + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
35}
36
37#[derive(Clone, Debug, Serialize, serde::Deserialize)]
38pub struct UnitTurnProtocol;
39
40impl TurnProtocol for UnitTurnProtocol {
41    type Event = ();
42    type Termination = ();
43    type DriverState = serde_json::Value;
44}
45
46/// Opaque identifier linking an effect to its response.
47#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
48pub struct EffectId(pub u64);
49
50#[derive(Clone, Debug, Serialize, serde::Deserialize)]
51pub struct PendingToolCall {
52    pub call_id: String,
53    pub tool_name: String,
54    pub args: Value,
55    /// Opaque provider replay state carried through for the next request.
56    pub replay: Option<ProviderReplayMeta>,
57}
58
59#[derive(Clone, Debug, Serialize, serde::Deserialize)]
60pub struct CompletedToolCall {
61    pub call_id: String,
62    pub tool_name: String,
63    pub args: Value,
64    pub output: ToolCallOutput,
65    pub model_return: ModelToolReturn,
66    pub duration_ms: u64,
67    /// See [`PendingToolCall::replay`].
68    pub replay: Option<ProviderReplayMeta>,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
72pub struct TurnCause {
73    pub id: String,
74    pub event_type: String,
75    pub origin: MessageOrigin,
76    pub text: String,
77}
78
79impl TurnCause {
80    pub fn to_event_message(&self) -> Message {
81        Message {
82            id: self.id.clone(),
83            role: MessageRole::Event,
84            parts: Arc::new(vec![Part {
85                id: format!("{}.p0", self.id),
86                kind: PartKind::Text,
87                content: self.text.clone(),
88                attachment: None,
89                tool_call_id: None,
90                tool_name: None,
91                tool_replay: None,
92                prune_state: PruneState::Intact,
93                reasoning_meta: None,
94                response_meta: None,
95            }]),
96            origin: Some(self.origin.clone()),
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
102pub struct CheckpointDelivery {
103    pub messages: Vec<PluginMessage>,
104    pub transient_messages: Vec<PluginMessage>,
105    pub turn_causes: Vec<TurnCause>,
106}
107
108pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
109    if causes.is_empty() {
110        return None;
111    }
112
113    let mut rendered = String::from("=== TURN EVENTS ===");
114    for (index, cause) in causes.iter().enumerate() {
115        rendered.push_str("\n\n");
116        rendered.push_str(&format!(
117            "--- event[{index}] · {} · {} ---\n",
118            cause.event_type, cause.id
119        ));
120        rendered.push_str("Origin: ");
121        rendered.push_str(&render_message_origin(&cause.origin));
122        rendered.push_str("\n\n");
123        rendered.push_str(cause.text.trim());
124    }
125    Some(rendered)
126}
127
128fn render_message_origin(origin: &MessageOrigin) -> String {
129    match origin {
130        MessageOrigin::Plugin {
131            plugin_id,
132            transient,
133        } => {
134            if *transient {
135                format!("plugin {plugin_id} (transient)")
136            } else {
137                format!("plugin {plugin_id}")
138            }
139        }
140        MessageOrigin::Process {
141            process_id,
142            event_type,
143            sequence,
144            wake_id,
145            ..
146        } => match wake_id {
147            Some(wake_id) => {
148                format!("process {process_id} {event_type} #{sequence} ({wake_id})")
149            }
150            None => format!("process {process_id} {event_type} #{sequence}"),
151        },
152    }
153}
154
155#[derive(Clone, Debug, Serialize, serde::Deserialize)]
156pub enum LogEvent {
157    LlmDebug {
158        session_id: String,
159        protocol_iteration: usize,
160        usage: TokenUsage,
161        provider_usage: Option<Value>,
162        request_body: Option<String>,
163        response_text: String,
164        response_parts: Option<Value>,
165    },
166    LlmError {
167        session_id: String,
168        protocol_iteration: usize,
169        request_body: Option<String>,
170        message: String,
171        retryable: bool,
172        raw: Option<String>,
173        code: Option<String>,
174        terminal_reason: LlmTerminalReason,
175    },
176}
177
178/// An effect the host must fulfil.
179//
180// `Clone` is implemented by hand below rather than derived: the derive would
181// demand `M: Clone`, but only `M::Event` is ever cloned (and `TurnProtocol`
182// already guarantees `Event: Clone`), so a manual impl keeps `Effect<M>`
183// cloneable for every protocol — which the turn checkpoint relies on.
184#[derive(Debug, Serialize, serde::Deserialize)]
185#[allow(clippy::large_enum_variant)]
186pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
187    /// Sync the live execution surface before the turn proceeds.
188    ///
189    /// `update_machine_config` is only needed after the turn has
190    /// already advanced at least once and the host may need to swap in
191    /// a refreshed system prompt or tool schema for the next
192    /// protocol iteration. Initial syncs are host-only because the machine was
193    /// already constructed from a fresh execution surface.
194    SyncExecutionSurface {
195        id: EffectId,
196        update_machine_config: bool,
197    },
198    /// Start an LLM call.
199    LlmCall {
200        id: EffectId,
201        request: Arc<LlmRequest>,
202    },
203    /// Cancel an in-progress LLM stream.
204    CancelLlm { id: EffectId },
205    /// Execute one or more driver-scheduled tool calls.
206    ToolCalls {
207        id: EffectId,
208        calls: Vec<PendingToolCall>,
209    },
210    /// Execute a protocol-owned code block.
211    ExecCode { id: EffectId, code: String },
212    /// Run a host/plugin checkpoint before the machine continues or completes.
213    Checkpoint {
214        id: EffectId,
215        checkpoint: CheckpointKind,
216    },
217    /// Host-implemented fire-and-forget logging.
218    Log { event: LogEvent },
219    /// Fire-and-forget event (no response needed).
220    Emit(SessionEvent),
221    /// Prompt-history progress that may be durably persisted by the host.
222    ///
223    /// This is separate from [`SessionEvent`]: UI stream events can be partial,
224    /// duplicated, or display-only, while `Progress` is emitted only after the
225    /// state machine has applied semantic message or protocol-step changes.
226    Progress {
227        messages: MessageSequence,
228        event_delta: Vec<SessionEventRecord<M::Event>>,
229        protocol_iteration: usize,
230    },
231    /// Turn is done.
232    Done {
233        messages: MessageSequence,
234        event_delta: Vec<SessionEventRecord<M::Event>>,
235        protocol_iteration: usize,
236    },
237}
238
239impl<M: TurnProtocol> Clone for Effect<M> {
240    fn clone(&self) -> Self {
241        match self {
242            Self::SyncExecutionSurface {
243                id,
244                update_machine_config,
245            } => Self::SyncExecutionSurface {
246                id: *id,
247                update_machine_config: *update_machine_config,
248            },
249            Self::LlmCall { id, request } => Self::LlmCall {
250                id: *id,
251                request: Arc::clone(request),
252            },
253            Self::CancelLlm { id } => Self::CancelLlm { id: *id },
254            Self::ToolCalls { id, calls } => Self::ToolCalls {
255                id: *id,
256                calls: calls.clone(),
257            },
258            Self::ExecCode { id, code } => Self::ExecCode {
259                id: *id,
260                code: code.clone(),
261            },
262            Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
263                id: *id,
264                checkpoint: *checkpoint,
265            },
266            Self::Log { event } => Self::Log {
267                event: event.clone(),
268            },
269            Self::Emit(event) => Self::Emit(event.clone()),
270            Self::Progress {
271                messages,
272                event_delta,
273                protocol_iteration,
274            } => Self::Progress {
275                messages: messages.clone(),
276                event_delta: event_delta.clone(),
277                protocol_iteration: *protocol_iteration,
278            },
279            Self::Done {
280                messages,
281                event_delta,
282                protocol_iteration,
283            } => Self::Done {
284                messages: messages.clone(),
285                event_delta: event_delta.clone(),
286                protocol_iteration: *protocol_iteration,
287            },
288        }
289    }
290}
291
292impl<M: TurnProtocol> Effect<M> {
293    fn id(&self) -> Option<EffectId> {
294        match self {
295            Self::SyncExecutionSurface { id, .. }
296            | Self::LlmCall { id, .. }
297            | Self::CancelLlm { id }
298            | Self::ToolCalls { id, .. }
299            | Self::ExecCode { id, .. }
300            | Self::Checkpoint { id, .. } => Some(*id),
301            Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
302        }
303    }
304}
305
306/// Error details from a failed LLM call.
307#[derive(Clone, Debug, Serialize, serde::Deserialize)]
308pub struct LlmCallError {
309    pub message: String,
310    pub retryable: bool,
311    pub raw: Option<String>,
312    pub code: Option<String>,
313    pub terminal_reason: LlmTerminalReason,
314    pub request_body: Option<String>,
315}
316
317/// A response to a previously emitted effect.
318pub enum Response {
319    /// Live execution surface sync completed.
320    ExecutionSurfaceSynced {
321        id: EffectId,
322        result: Result<Option<ExecutionSurfaceSync>, String>,
323    },
324    /// Full LLM response.
325    LlmComplete {
326        id: EffectId,
327        result: Result<LlmResponse, LlmCallError>,
328        /// When true, text deltas were already emitted during streaming,
329        /// so the driver should skip emitting `TextDelta` events.
330        text_streamed: bool,
331    },
332    /// Native tool results.
333    ToolResults {
334        id: EffectId,
335        results: Vec<CompletedToolCall>,
336    },
337    /// Mode code execution result.
338    ExecResult {
339        id: EffectId,
340        result: Result<crate::ExecResponse, String>,
341    },
342    /// Checkpoint result with optional injected messages.
343    Checkpoint {
344        id: EffectId,
345        delivery: CheckpointDelivery,
346    },
347}
348
349#[derive(Clone, Debug, Serialize, serde::Deserialize)]
350pub struct ExecutionSurfaceSync {
351    pub system_prompt: Arc<str>,
352    pub tool_specs: Arc<Vec<LlmToolSpec>>,
353}
354
355pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
356    pub request: Arc<LlmRequest>,
357    driver_state: Option<M::DriverState>,
358}
359
360impl<M: TurnProtocol> WaitingLlmState<M> {
361    pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
362        self.driver_state.take()
363    }
364}
365
366pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
367    driver_state: M::DriverState,
368}
369
370impl<M: TurnProtocol> WaitingExecState<M> {
371    pub fn into_driver_state(self) -> M::DriverState {
372        self.driver_state
373    }
374}
375
376#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
377pub enum CheckpointResumeAction {
378    PrepareIteration,
379    Finish(TurnOutcome),
380}
381
382#[allow(clippy::large_enum_variant)]
383pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
384    Emit(SessionEvent),
385    AppendEvents(Vec<SessionEventRecord<M::Event>>),
386    StartLlm {
387        request: Arc<LlmRequest>,
388        driver_state: Option<M::DriverState>,
389    },
390    StartTools {
391        calls: Vec<PendingToolCall>,
392    },
393    StartExec {
394        code: String,
395        driver_state: M::DriverState,
396    },
397    StartCheckpoint {
398        checkpoint: CheckpointKind,
399        on_empty: CheckpointResumeAction,
400    },
401    AdvanceProtocolIteration,
402    ScheduleTurnLimitFinal {
403        message: Message,
404    },
405    Finish(TurnOutcome),
406}
407
408pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
409    config: &'a TurnMachineConfig<M>,
410    messages: &'a MessageSequence,
411    events: &'a [SessionEventRecord<M::Event>],
412    turn_causes: &'a [TurnCause],
413    protocol_iteration: usize,
414    protocol_run_offset: usize,
415    termination: &'a TurnTerminationPolicyState,
416}
417
418impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
419    pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
420        self.config.projector.project(ProjectorContext {
421            config: self.config,
422            messages: self.messages,
423            events: self.events,
424            turn_causes: self.turn_causes,
425            protocol_iteration: self.protocol_iteration,
426            use_tools,
427        })
428    }
429
430    pub fn protocol_iteration(&self) -> usize {
431        self.protocol_iteration
432    }
433
434    pub fn protocol_run_offset(&self) -> usize {
435        self.protocol_run_offset
436    }
437
438    pub fn max_turns(&self) -> Option<usize> {
439        self.config.max_turns
440    }
441
442    pub fn termination(&self) -> &M::Termination {
443        &self.config.termination
444    }
445
446    pub fn autonomous(&self) -> bool {
447        self.config.autonomous
448    }
449
450    pub fn should_force_exit_after_grace_turn(&self) -> bool {
451        self.termination.should_force_exit_after_grace_turn()
452    }
453
454    pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
455        self.termination.turn_limit_final_to_schedule(
456            self.protocol_iteration,
457            self.protocol_run_offset,
458            self.config.max_turns,
459        )
460    }
461
462    pub fn messages(&self) -> &MessageSequence {
463        self.messages
464    }
465
466    pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
467        self.events
468    }
469
470    pub fn turn_causes(&self) -> &[TurnCause] {
471        self.turn_causes
472    }
473}
474
475pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
476    pub config: &'a TurnMachineConfig<M>,
477    pub messages: &'a MessageSequence,
478    pub events: &'a [SessionEventRecord<M::Event>],
479    pub turn_causes: &'a [TurnCause],
480    pub protocol_iteration: usize,
481    pub use_tools: bool,
482}
483
484pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
485    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
486}
487
488#[derive(Clone, Debug, Default)]
489pub struct ChatContextProjector;
490
491impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
492    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
493        let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
494        let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
495        let mut messages = rendered_prompt.messages;
496        if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
497            messages.push(crate::llm::types::LlmMessage::text(
498                crate::llm::types::LlmRole::User,
499                Arc::from(turn_events),
500            ));
501        }
502        if !ctx.config.system_prompt.trim().is_empty() {
503            messages.insert(
504                0,
505                crate::llm::types::LlmMessage::text(
506                    crate::llm::types::LlmRole::System,
507                    Arc::clone(&ctx.config.system_prompt),
508                ),
509            );
510        }
511
512        Arc::new(LlmRequest {
513            model: ctx.config.model.clone(),
514            messages,
515            attachments,
516            tools: if ctx.use_tools {
517                Arc::clone(&ctx.config.tool_specs)
518            } else {
519                Arc::new(Vec::new())
520            },
521            tool_choice: if ctx.use_tools {
522                LlmToolChoice::Auto
523            } else {
524                LlmToolChoice::None
525            },
526            model_variant: ctx.config.model_variant.clone(),
527            generation: ctx.config.generation.clone(),
528            session_id: ctx.config.run_session_id.clone(),
529            output_spec: None,
530            stream_events: None,
531            provider_trace: None,
532        })
533    }
534}
535
536fn render_messages_for_projector(
537    messages: &MessageSequence,
538    turn_causes: &[TurnCause],
539) -> crate::RenderedPrompt {
540    if turn_causes.is_empty() {
541        return messages.render_prompt();
542    }
543
544    let active_cause_ids = turn_causes
545        .iter()
546        .map(|cause| cause.id.as_str())
547        .collect::<HashSet<_>>();
548    let filtered = messages
549        .iter()
550        .filter(|message| {
551            !(matches!(message.role, MessageRole::Event)
552                && active_cause_ids.contains(message.id.as_str()))
553        })
554        .cloned()
555        .collect::<Vec<_>>();
556    render_prompt(filtered.as_slice())
557}
558
559pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
560    fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
561    fn handle_llm_success(
562        &self,
563        ctx: DriverContextView<'_, M>,
564        waiting: WaitingLlmState<M>,
565        llm_response: LlmResponse,
566        text_streamed: bool,
567    ) -> Vec<DriverAction<M>>;
568    fn handle_tool_results(
569        &self,
570        ctx: DriverContextView<'_, M>,
571        completed: Vec<CompletedToolCall>,
572    ) -> Vec<DriverAction<M>>;
573    fn handle_exec_result(
574        &self,
575        ctx: DriverContextView<'_, M>,
576        waiting: WaitingExecState<M>,
577        result: Result<crate::ExecResponse, String>,
578    ) -> Vec<DriverAction<M>>;
579}
580
581/// Configuration for a `TurnMachine` instance.
582pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
583    pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
584    pub projector: Arc<dyn ContextProjector<M>>,
585    pub sync_execution_surface: bool,
586    pub model: String,
587    /// Model context-window size in tokens, if known. Lets the kernel
588    /// reclassify a zero-output `OutputLimit` terminal reason as
589    /// `ContextOverflow` when the prompt nearly filled the window. `None`
590    /// disables that refinement.
591    pub max_context_tokens: Option<usize>,
592    pub max_turns: Option<usize>,
593    pub model_variant: Option<String>,
594    pub generation: crate::llm::types::GenerationOptions,
595    pub run_session_id: Option<String>,
596    pub autonomous: bool,
597    pub tool_specs: Arc<Vec<LlmToolSpec>>,
598    pub system_prompt: Arc<str>,
599    pub session_id: String,
600    pub emit_llm_trace: bool,
601    pub termination: M::Termination,
602    pub turn_limit_final_message: crate::TurnLimitFinalMessage,
603}
604
605// ─── Internal state ───
606
607#[derive(Debug, Serialize, serde::Deserialize)]
608enum MachineState<M: TurnProtocol = UnitTurnProtocol> {
609    PreparingProtocol,
610    WaitingExecutionSurface {
611        effect_id: EffectId,
612        update_machine_config: bool,
613    },
614    PrepareIteration,
615    WaitingLlm {
616        effect_id: EffectId,
617        request: Arc<LlmRequest>,
618        driver_state: Option<M::DriverState>,
619    },
620    WaitingTools {
621        effect_id: EffectId,
622        calls: Vec<PendingToolCall>,
623    },
624    WaitingExec {
625        effect_id: EffectId,
626        code: String,
627        driver_state: M::DriverState,
628    },
629    WaitingCheckpoint {
630        effect_id: EffectId,
631        checkpoint: CheckpointKind,
632        on_empty: CheckpointResumeAction,
633    },
634    Finished,
635}
636
637#[derive(Clone, Debug, Serialize, serde::Deserialize)]
638pub struct TurnCheckpoint<M: TurnProtocol = UnitTurnProtocol> {
639    state: MachineState<M>,
640    pending_effects: Vec<Effect<M>>,
641    next_effect_id: u64,
642    #[serde(default)]
643    next_synthetic_message_id: u64,
644    messages: Vec<Message>,
645    events: Vec<SessionEventRecord<M::Event>>,
646    #[serde(default)]
647    turn_causes: Vec<TurnCause>,
648    #[serde(default)]
649    progress_event_cursor: usize,
650    protocol_iteration: usize,
651    protocol_run_offset: usize,
652    cumulative_usage: TokenUsage,
653    termination: TurnTerminationPolicyState,
654    synced_protocol_iteration: Option<usize>,
655}
656
657impl<M: TurnProtocol> Clone for MachineState<M> {
658    fn clone(&self) -> Self {
659        match self {
660            Self::PreparingProtocol => Self::PreparingProtocol,
661            Self::WaitingExecutionSurface {
662                effect_id,
663                update_machine_config,
664            } => Self::WaitingExecutionSurface {
665                effect_id: *effect_id,
666                update_machine_config: *update_machine_config,
667            },
668            Self::PrepareIteration => Self::PrepareIteration,
669            Self::WaitingLlm {
670                effect_id,
671                request,
672                driver_state,
673            } => Self::WaitingLlm {
674                effect_id: *effect_id,
675                request: Arc::clone(request),
676                driver_state: driver_state.clone(),
677            },
678            Self::WaitingTools { effect_id, calls } => Self::WaitingTools {
679                effect_id: *effect_id,
680                calls: calls.clone(),
681            },
682            Self::WaitingExec {
683                effect_id,
684                code,
685                driver_state,
686            } => Self::WaitingExec {
687                effect_id: *effect_id,
688                code: code.clone(),
689                driver_state: driver_state.clone(),
690            },
691            Self::WaitingCheckpoint {
692                effect_id,
693                checkpoint,
694                on_empty,
695            } => Self::WaitingCheckpoint {
696                effect_id: *effect_id,
697                checkpoint: *checkpoint,
698                on_empty: on_empty.clone(),
699            },
700            Self::Finished => Self::Finished,
701        }
702    }
703}
704
705impl<M: TurnProtocol> MachineState<M> {
706    fn outstanding_effect_id(&self) -> Option<EffectId> {
707        match self {
708            Self::WaitingExecutionSurface { effect_id, .. }
709            | Self::WaitingLlm { effect_id, .. }
710            | Self::WaitingTools { effect_id, .. }
711            | Self::WaitingExec { effect_id, .. }
712            | Self::WaitingCheckpoint { effect_id, .. } => Some(*effect_id),
713            Self::PreparingProtocol | Self::PrepareIteration | Self::Finished => None,
714        }
715    }
716
717    fn outstanding_effect(&self) -> Option<Effect<M>> {
718        match self {
719            Self::WaitingExecutionSurface {
720                effect_id,
721                update_machine_config,
722            } => Some(Effect::SyncExecutionSurface {
723                id: *effect_id,
724                update_machine_config: *update_machine_config,
725            }),
726            Self::WaitingLlm {
727                effect_id, request, ..
728            } => Some(Effect::LlmCall {
729                id: *effect_id,
730                request: Arc::clone(request),
731            }),
732            Self::WaitingTools { effect_id, calls } => Some(Effect::ToolCalls {
733                id: *effect_id,
734                calls: calls.clone(),
735            }),
736            Self::WaitingExec {
737                effect_id, code, ..
738            } => Some(Effect::ExecCode {
739                id: *effect_id,
740                code: code.clone(),
741            }),
742            Self::WaitingCheckpoint {
743                effect_id,
744                checkpoint,
745                ..
746            } => Some(Effect::Checkpoint {
747                id: *effect_id,
748                checkpoint: *checkpoint,
749            }),
750            Self::PreparingProtocol | Self::PrepareIteration | Self::Finished => None,
751        }
752    }
753}
754
755/// Sans-IO state machine for a single session run (multi-turn).
756pub struct TurnMachine<M: TurnProtocol = UnitTurnProtocol> {
757    config: TurnMachineConfig<M>,
758    state: MachineState<M>,
759    pending_effects: VecDeque<Effect<M>>,
760    active_effect_redelivery: bool,
761    next_effect_id: u64,
762    next_synthetic_message_id: u64,
763    messages: MessageSequence,
764    events: Arc<Vec<SessionEventRecord<M::Event>>>,
765    turn_causes: Vec<TurnCause>,
766    progress_event_cursor: usize,
767    protocol_iteration: usize,
768    protocol_run_offset: usize,
769    cumulative_usage: TokenUsage,
770    termination: TurnTerminationPolicyState,
771    synced_protocol_iteration: Option<usize>,
772}
773
774impl<M: TurnProtocol> TurnMachine<M> {
775    /// Create a new machine in `PrepareIteration` state.
776    pub fn new(
777        config: TurnMachineConfig<M>,
778        messages: Vec<Message>,
779        events: Arc<Vec<SessionEventRecord<M::Event>>>,
780        protocol_run_offset: usize,
781    ) -> Self {
782        Self::new_shared(
783            config,
784            MessageSequence::from_owned(messages),
785            events,
786            protocol_run_offset,
787        )
788    }
789
790    pub fn new_shared(
791        config: TurnMachineConfig<M>,
792        messages: MessageSequence,
793        events: Arc<Vec<SessionEventRecord<M::Event>>>,
794        protocol_run_offset: usize,
795    ) -> Self {
796        Self::new_shared_with_turn_causes(config, messages, events, protocol_run_offset, Vec::new())
797    }
798
799    pub fn new_shared_with_turn_causes(
800        config: TurnMachineConfig<M>,
801        messages: MessageSequence,
802        events: Arc<Vec<SessionEventRecord<M::Event>>>,
803        protocol_run_offset: usize,
804        turn_causes: Vec<TurnCause>,
805    ) -> Self {
806        let next_synthetic_message_id = messages.len() as u64;
807        Self {
808            config,
809            state: MachineState::PreparingProtocol,
810            pending_effects: VecDeque::new(),
811            active_effect_redelivery: false,
812            next_effect_id: 1,
813            next_synthetic_message_id,
814            messages,
815            progress_event_cursor: events.len(),
816            events,
817            turn_causes,
818            protocol_iteration: protocol_run_offset,
819            protocol_run_offset,
820            cumulative_usage: TokenUsage::default(),
821            termination: TurnTerminationPolicyState::new(),
822            synced_protocol_iteration: None,
823        }
824    }
825
826    /// Whether the machine has finished.
827    pub fn is_done(&self) -> bool {
828        matches!(self.state, MachineState::Finished)
829    }
830
831    pub fn messages(&self) -> Arc<Vec<Message>> {
832        self.messages.shared()
833    }
834
835    pub fn events(&self) -> Arc<Vec<SessionEventRecord<M::Event>>> {
836        Arc::clone(&self.events)
837    }
838
839    pub fn message_sequence(&self) -> MessageSequence {
840        self.messages.clone()
841    }
842
843    pub fn protocol_iteration(&self) -> usize {
844        self.protocol_iteration
845    }
846
847    pub fn checkpoint(&self) -> TurnCheckpoint<M> {
848        let active_effect_id = self.state.outstanding_effect_id();
849        let pending_effects = self
850            .pending_effects
851            .iter()
852            .filter(|effect| active_effect_id.is_none_or(|id| effect.id() != Some(id)))
853            .cloned()
854            .collect::<Vec<_>>();
855        TurnCheckpoint {
856            state: self.state.clone(),
857            pending_effects,
858            next_effect_id: self.next_effect_id,
859            next_synthetic_message_id: self.next_synthetic_message_id,
860            messages: self.messages.iter().cloned().collect(),
861            events: self.events.as_ref().clone(),
862            turn_causes: self.turn_causes.clone(),
863            progress_event_cursor: self.progress_event_cursor,
864            protocol_iteration: self.protocol_iteration,
865            protocol_run_offset: self.protocol_run_offset,
866            cumulative_usage: self.cumulative_usage.clone(),
867            termination: self.termination.clone(),
868            synced_protocol_iteration: self.synced_protocol_iteration,
869        }
870    }
871
872    pub fn restore_from_checkpoint(
873        config: TurnMachineConfig<M>,
874        checkpoint: TurnCheckpoint<M>,
875    ) -> Self {
876        let active_effect_id = checkpoint.state.outstanding_effect_id();
877        let pending_effects = checkpoint
878            .pending_effects
879            .into_iter()
880            .collect::<VecDeque<_>>();
881        let active_effect_redelivery = active_effect_id.is_some()
882            && !pending_effects
883                .iter()
884                .any(|effect| effect.id() == active_effect_id);
885        Self {
886            config,
887            state: checkpoint.state,
888            pending_effects,
889            active_effect_redelivery,
890            next_effect_id: checkpoint.next_effect_id,
891            next_synthetic_message_id: checkpoint.next_synthetic_message_id,
892            messages: MessageSequence::from_owned(checkpoint.messages),
893            events: Arc::new(checkpoint.events),
894            turn_causes: checkpoint.turn_causes,
895            progress_event_cursor: checkpoint.progress_event_cursor,
896            protocol_iteration: checkpoint.protocol_iteration,
897            protocol_run_offset: checkpoint.protocol_run_offset,
898            cumulative_usage: checkpoint.cumulative_usage,
899            termination: checkpoint.termination,
900            synced_protocol_iteration: checkpoint.synced_protocol_iteration,
901        }
902    }
903
904    fn driver_context(&self) -> DriverContextView<'_, M> {
905        DriverContextView {
906            config: &self.config,
907            messages: &self.messages,
908            events: self.events.as_slice(),
909            turn_causes: &self.turn_causes,
910            protocol_iteration: self.protocol_iteration,
911            protocol_run_offset: self.protocol_run_offset,
912            termination: &self.termination,
913        }
914    }
915
916    fn next_id(&mut self) -> EffectId {
917        let id = EffectId(self.next_effect_id);
918        self.next_effect_id += 1;
919        id
920    }
921
922    fn next_synthetic_message_id(&mut self, scope: &str) -> String {
923        let id = format!(
924            "m_sansio_{}_{}_{}",
925            self.protocol_run_offset, scope, self.next_synthetic_message_id
926        );
927        self.next_synthetic_message_id += 1;
928        id
929    }
930
931    fn emit(&mut self, event: SessionEvent) {
932        self.pending_effects.push_back(Effect::Emit(event));
933    }
934
935    fn emit_progress(&mut self) {
936        let event_delta = self.next_event_delta();
937        self.pending_effects.push_back(Effect::Progress {
938            messages: self.messages.clone(),
939            event_delta,
940            protocol_iteration: self.protocol_iteration,
941        });
942    }
943
944    pub fn fail_turn(&mut self, event: SessionEvent) {
945        self.emit(event);
946        self.finish(TurnOutcome::Stopped(TurnStop::RuntimeError));
947    }
948
949    pub fn finish_with_outcome(&mut self, outcome: TurnOutcome) {
950        self.finish(outcome);
951    }
952
953    fn finish(&mut self, outcome: TurnOutcome) {
954        self.emit(SessionEvent::TurnOutcome { outcome });
955        self.emit(SessionEvent::Done);
956        let msgs = std::mem::take(&mut self.messages);
957        let event_delta = self.next_event_delta();
958        let protocol_iteration = self.protocol_iteration;
959        self.state = MachineState::Finished;
960        self.pending_effects.push_back(Effect::Done {
961            messages: msgs,
962            event_delta,
963            protocol_iteration,
964        });
965    }
966
967    fn next_event_delta(&mut self) -> Vec<SessionEventRecord<M::Event>> {
968        if self.progress_event_cursor >= self.events.len() {
969            self.progress_event_cursor = self.events.len();
970            return Vec::new();
971        }
972        let delta = self.events[self.progress_event_cursor..].to_vec();
973        self.progress_event_cursor = self.events.len();
974        delta
975    }
976
977    /// Drain the next pending effect. Returns `None` when the host must call
978    /// `handle_response()` before more effects become available.
979    pub fn poll_effect(&mut self) -> Option<Effect<M>> {
980        if let Some(effect) = self.pending_effects.pop_front() {
981            return Some(effect);
982        }
983        if self.active_effect_redelivery {
984            self.active_effect_redelivery = false;
985            if let Some(effect) = self.state.outstanding_effect() {
986                return Some(effect);
987            }
988        }
989
990        match &self.state {
991            MachineState::PreparingProtocol => {
992                self.prepare_protocol();
993                self.pending_effects.pop_front()
994            }
995            MachineState::PrepareIteration => {
996                self.prepare_protocol_iteration();
997                self.pending_effects.pop_front()
998            }
999            _ => None,
1000        }
1001    }
1002
1003    // ─── State transitions ───
1004
1005    fn prepare_protocol(&mut self) {
1006        if self.config.sync_execution_surface {
1007            let id = self.next_id();
1008            self.state = MachineState::WaitingExecutionSurface {
1009                effect_id: id,
1010                update_machine_config: false,
1011            };
1012            self.pending_effects
1013                .push_back(Effect::SyncExecutionSurface {
1014                    id,
1015                    update_machine_config: false,
1016                });
1017            return;
1018        }
1019
1020        self.prepare_protocol_iteration();
1021    }
1022
1023    fn prepare_protocol_iteration(&mut self) {
1024        if self.config.sync_execution_surface
1025            && self.synced_protocol_iteration != Some(self.protocol_iteration)
1026        {
1027            let id = self.next_id();
1028            self.state = MachineState::WaitingExecutionSurface {
1029                effect_id: id,
1030                update_machine_config: true,
1031            };
1032            self.pending_effects
1033                .push_back(Effect::SyncExecutionSurface {
1034                    id,
1035                    update_machine_config: true,
1036                });
1037            return;
1038        }
1039        let actions = {
1040            let driver = Arc::clone(&self.config.protocol_driver);
1041            let ctx = self.driver_context();
1042            driver.prepare_protocol_iteration(ctx)
1043        };
1044        self.apply_actions(actions);
1045    }
1046
1047    fn start_llm_request(
1048        &mut self,
1049        request: Arc<LlmRequest>,
1050        driver_state: Option<M::DriverState>,
1051    ) {
1052        let tool_list = self
1053            .config
1054            .tool_specs
1055            .iter()
1056            .map(|tool| tool.name.as_str())
1057            .collect::<Vec<_>>()
1058            .join(", ");
1059        self.emit(SessionEvent::LlmRequest {
1060            protocol_iteration: self.protocol_iteration,
1061            message_count: self.messages.len(),
1062            tool_list,
1063        });
1064
1065        let id = self.next_id();
1066        self.state = MachineState::WaitingLlm {
1067            effect_id: id,
1068            request: Arc::clone(&request),
1069            driver_state,
1070        };
1071        self.pending_effects
1072            .push_back(Effect::LlmCall { id, request });
1073    }
1074
1075    fn start_tool_calls(&mut self, calls: Vec<PendingToolCall>) {
1076        let effect_id = self.next_id();
1077        self.state = MachineState::WaitingTools {
1078            effect_id,
1079            calls: calls.clone(),
1080        };
1081        self.pending_effects.push_back(Effect::ToolCalls {
1082            id: effect_id,
1083            calls,
1084        });
1085    }
1086
1087    fn start_exec(&mut self, code: String, driver_state: M::DriverState) {
1088        let effect_id = self.next_id();
1089        self.state = MachineState::WaitingExec {
1090            effect_id,
1091            code: code.clone(),
1092            driver_state,
1093        };
1094        self.pending_effects.push_back(Effect::ExecCode {
1095            id: effect_id,
1096            code,
1097        });
1098    }
1099
1100    fn schedule_turn_limit_final(&mut self, message: Message) -> bool {
1101        let Some(_max_turns) = self.termination.turn_limit_final_to_schedule(
1102            self.protocol_iteration,
1103            self.protocol_run_offset,
1104            self.config.max_turns,
1105        ) else {
1106            return false;
1107        };
1108        self.termination.mark_turn_limit_final_scheduled();
1109        self.messages.push(message);
1110        true
1111    }
1112
1113    fn schedule_configured_turn_limit_final(&mut self) -> bool {
1114        let Some(max_turns) = self.termination.turn_limit_final_to_schedule(
1115            self.protocol_iteration,
1116            self.protocol_run_offset,
1117            self.config.max_turns,
1118        ) else {
1119            return false;
1120        };
1121        let message_id = self.next_synthetic_message_id("turn_limit");
1122        let message = (self.config.turn_limit_final_message)(message_id, max_turns);
1123        self.termination.mark_turn_limit_final_scheduled();
1124        self.messages.push(message);
1125        true
1126    }
1127
1128    fn append_event(&mut self, event: SessionEventRecord<M::Event>) {
1129        match event {
1130            SessionEventRecord::Conversation(record) => {
1131                Arc::make_mut(&mut self.events)
1132                    .push(SessionEventRecord::Conversation(record.clone()));
1133                self.messages.push(record.to_message());
1134            }
1135            SessionEventRecord::Protocol(protocol_event) => {
1136                Arc::make_mut(&mut self.events).push(SessionEventRecord::Protocol(protocol_event));
1137            }
1138        }
1139    }
1140
1141    pub fn apply_actions(&mut self, actions: Vec<DriverAction<M>>) {
1142        let mut progress_dirty = false;
1143        for action in actions {
1144            match action {
1145                DriverAction::Emit(event) => self.emit(event),
1146                DriverAction::AppendEvents(events) => {
1147                    if !events.is_empty() {
1148                        for event in events {
1149                            self.append_event(event);
1150                        }
1151                        progress_dirty = true;
1152                    }
1153                }
1154                DriverAction::StartLlm {
1155                    request,
1156                    driver_state,
1157                } => self.start_llm_request(request, driver_state),
1158                DriverAction::StartTools { calls } => self.start_tool_calls(calls),
1159                DriverAction::StartExec { code, driver_state } => {
1160                    self.start_exec(code, driver_state)
1161                }
1162                DriverAction::StartCheckpoint {
1163                    checkpoint,
1164                    on_empty,
1165                } => self.request_checkpoint(checkpoint, on_empty),
1166                DriverAction::AdvanceProtocolIteration => {
1167                    self.protocol_iteration += 1;
1168                    self.synced_protocol_iteration = None;
1169                    progress_dirty = true;
1170                }
1171                DriverAction::ScheduleTurnLimitFinal { message } => {
1172                    if self.schedule_turn_limit_final(message) {
1173                        progress_dirty = true;
1174                    }
1175                }
1176                DriverAction::Finish(outcome) => {
1177                    if progress_dirty {
1178                        self.emit_progress();
1179                        progress_dirty = false;
1180                    }
1181                    self.finish(outcome);
1182                    break;
1183                }
1184            }
1185        }
1186        if progress_dirty {
1187            self.emit_progress();
1188        }
1189    }
1190
1191    /// Feed a response to a previously emitted effect.
1192    pub fn handle_response(&mut self, response: Response) {
1193        self.active_effect_redelivery = false;
1194        match response {
1195            Response::ExecutionSurfaceSynced { id, result } => {
1196                self.handle_execution_surface_synced(id, result)
1197            }
1198            Response::LlmComplete {
1199                id,
1200                result,
1201                text_streamed,
1202            } => self.handle_llm_complete(id, result, text_streamed),
1203            Response::ToolResults { id, results } => self.handle_tool_results(id, results),
1204            Response::ExecResult { id, result } => self.handle_exec_result(id, result),
1205            Response::Checkpoint { id, delivery } => self.handle_checkpoint(id, delivery),
1206        }
1207    }
1208
1209    fn request_checkpoint(&mut self, checkpoint: CheckpointKind, on_empty: CheckpointResumeAction) {
1210        let id = self.next_id();
1211        self.state = MachineState::WaitingCheckpoint {
1212            effect_id: id,
1213            checkpoint,
1214            on_empty,
1215        };
1216        self.pending_effects
1217            .push_back(Effect::Checkpoint { id, checkpoint });
1218    }
1219
1220    fn handle_execution_surface_synced(
1221        &mut self,
1222        id: EffectId,
1223        result: Result<Option<ExecutionSurfaceSync>, String>,
1224    ) {
1225        let (waiting_id, waiting_update_machine_config) =
1226            match std::mem::replace(&mut self.state, MachineState::Finished) {
1227                MachineState::WaitingExecutionSurface {
1228                    effect_id,
1229                    update_machine_config,
1230                } => (effect_id, update_machine_config),
1231                other => {
1232                    self.state = other;
1233                    return;
1234                }
1235            };
1236        if waiting_id != id {
1237            self.state = MachineState::WaitingExecutionSurface {
1238                effect_id: waiting_id,
1239                update_machine_config: waiting_update_machine_config,
1240            };
1241            return;
1242        }
1243
1244        match result {
1245            Ok(update) => {
1246                if let Some(update) = update {
1247                    self.config.system_prompt = update.system_prompt;
1248                    self.config.tool_specs = update.tool_specs;
1249                }
1250                self.synced_protocol_iteration = Some(self.protocol_iteration);
1251                self.state = MachineState::PrepareIteration;
1252            }
1253            Err(error) => {
1254                self.fail_turn(make_error_event(
1255                    "execution_surface",
1256                    Some("reconfigure_failed"),
1257                    format!("Failed to refresh execution surface: {error}"),
1258                    Some(error),
1259                ));
1260            }
1261        }
1262    }
1263
1264    fn append_checkpoint_messages(&mut self, plugin_messages: &[PluginMessage], transient: bool) {
1265        let mut appended = Vec::new();
1266        for message in plugin_messages
1267            .iter()
1268            .filter(|message| matches!(message.role, MessageRole::User | MessageRole::System))
1269        {
1270            let message_id = self.next_synthetic_message_id("checkpoint");
1271            let mut parts = if message.parts.is_empty() {
1272                vec![Part {
1273                    id: format!("{message_id}.p0"),
1274                    kind: PartKind::Text,
1275                    content: message.content.clone(),
1276                    attachment: None,
1277                    tool_call_id: None,
1278                    tool_name: None,
1279                    tool_replay: None,
1280                    prune_state: PruneState::Intact,
1281                    reasoning_meta: None,
1282                    response_meta: None,
1283                }]
1284            } else {
1285                message.parts.clone()
1286            };
1287            reassign_part_ids(&message_id, &mut parts);
1288            appended.push(Message {
1289                id: message_id.clone(),
1290                role: message.role,
1291                parts: Arc::new(parts),
1292                origin: message.origin.clone().or_else(|| {
1293                    Some(MessageOrigin::Plugin {
1294                        plugin_id: "plugin".to_string(),
1295                        transient,
1296                    })
1297                }),
1298            });
1299        }
1300        if !appended.is_empty() {
1301            self.messages.extend(appended);
1302        }
1303    }
1304
1305    fn append_turn_causes(&mut self, causes: Vec<TurnCause>) {
1306        if causes.is_empty() {
1307            return;
1308        }
1309        let mut existing_ids = self
1310            .turn_causes
1311            .iter()
1312            .map(|cause| cause.id.clone())
1313            .collect::<HashSet<_>>();
1314        for cause in causes {
1315            if !existing_ids.insert(cause.id.clone()) {
1316                continue;
1317            }
1318            self.messages.push(cause.to_event_message());
1319            self.turn_causes.push(cause);
1320        }
1321    }
1322
1323    fn handle_checkpoint(&mut self, id: EffectId, delivery: CheckpointDelivery) {
1324        let (effect_id, checkpoint, on_empty) =
1325            match std::mem::replace(&mut self.state, MachineState::Finished) {
1326                MachineState::WaitingCheckpoint {
1327                    effect_id,
1328                    checkpoint,
1329                    on_empty,
1330                } => (effect_id, checkpoint, on_empty),
1331                other => {
1332                    self.state = other;
1333                    return;
1334                }
1335            };
1336        if effect_id != id {
1337            self.state = MachineState::WaitingCheckpoint {
1338                effect_id,
1339                checkpoint,
1340                on_empty,
1341            };
1342            return;
1343        }
1344
1345        if !delivery.messages.is_empty()
1346            || !delivery.transient_messages.is_empty()
1347            || !delivery.turn_causes.is_empty()
1348        {
1349            self.append_checkpoint_messages(&delivery.messages, false);
1350            self.append_checkpoint_messages(&delivery.transient_messages, true);
1351            self.append_turn_causes(delivery.turn_causes);
1352            if matches!(checkpoint, CheckpointKind::BeforeCompletion) {
1353                self.protocol_iteration += 1;
1354                if self.termination.should_force_exit_after_grace_turn() {
1355                    self.emit_progress();
1356                    self.finish(TurnOutcome::Stopped(TurnStop::MaxTurns));
1357                    return;
1358                }
1359                self.schedule_configured_turn_limit_final();
1360            }
1361            self.state = MachineState::PrepareIteration;
1362            self.emit_progress();
1363            return;
1364        }
1365
1366        match on_empty {
1367            CheckpointResumeAction::PrepareIteration => {
1368                self.state = MachineState::PrepareIteration;
1369            }
1370            CheckpointResumeAction::Finish(outcome) => self.finish(outcome),
1371        }
1372    }
1373
1374    fn take_waiting_llm_state(&mut self, id: EffectId) -> Option<WaitingLlmState<M>> {
1375        match std::mem::replace(&mut self.state, MachineState::Finished) {
1376            MachineState::WaitingLlm {
1377                effect_id,
1378                request,
1379                driver_state,
1380            } if effect_id == id => Some(WaitingLlmState {
1381                request,
1382                driver_state,
1383            }),
1384            other => {
1385                self.state = other;
1386                None
1387            }
1388        }
1389    }
1390
1391    fn handle_llm_complete(
1392        &mut self,
1393        id: EffectId,
1394        result: Result<LlmResponse, LlmCallError>,
1395        text_streamed: bool,
1396    ) {
1397        let Some(waiting) = self.take_waiting_llm_state(id) else {
1398            return;
1399        };
1400        match result {
1401            Err(error) => {
1402                self.emit_llm_error(error);
1403            }
1404            Ok(mut llm_response) => {
1405                // Reclassify a zero-output `OutputLimit` as `ContextOverflow`
1406                // when the prompt nearly filled the window, before the terminal
1407                // reason drives the finish decision below.
1408                refine_terminal_reason_for_context_window(
1409                    &mut llm_response,
1410                    self.config.max_context_tokens,
1411                );
1412                self.record_llm_usage(&llm_response, self.llm_response_text(&llm_response));
1413                if self.handle_terminal_llm_response(&llm_response, text_streamed) {
1414                    return;
1415                }
1416                let actions = {
1417                    let driver = Arc::clone(&self.config.protocol_driver);
1418                    let ctx = self.driver_context();
1419                    driver.handle_llm_success(ctx, waiting, llm_response, text_streamed)
1420                };
1421                self.apply_actions(actions);
1422            }
1423        }
1424    }
1425
1426    fn handle_terminal_llm_response(
1427        &mut self,
1428        llm_response: &LlmResponse,
1429        text_streamed: bool,
1430    ) -> bool {
1431        let outcome = match llm_response.terminal_reason {
1432            LlmTerminalReason::OutputLimit => TurnOutcome::Stopped(TurnStop::Incomplete),
1433            LlmTerminalReason::ContextOverflow => TurnOutcome::Stopped(TurnStop::ProviderError),
1434            LlmTerminalReason::ContentFilter => TurnOutcome::Stopped(TurnStop::ProviderError),
1435            LlmTerminalReason::ProviderError => TurnOutcome::Stopped(TurnStop::ProviderError),
1436            LlmTerminalReason::Cancelled => TurnOutcome::Stopped(TurnStop::Cancelled),
1437            LlmTerminalReason::Stop | LlmTerminalReason::ToolUse | LlmTerminalReason::Unknown => {
1438                return false;
1439            }
1440        };
1441
1442        if !text_streamed && !llm_response.full_text.is_empty() {
1443            self.emit(SessionEvent::TextDelta {
1444                content: llm_response.full_text.clone(),
1445            });
1446        }
1447        self.emit(SessionEvent::LlmResponse {
1448            protocol_iteration: self.protocol_iteration,
1449            content: llm_response.full_text.clone(),
1450            duration_ms: 0,
1451        });
1452        let reason = llm_response.terminal_reason;
1453        let diagnostic = llm_response
1454            .terminal_diagnostic
1455            .clone()
1456            .unwrap_or_else(|| format!("Model call ended with terminal reason {reason:?}."));
1457        self.emit(SessionEvent::Error {
1458            message: diagnostic.clone(),
1459            envelope: Some(crate::session_model::make_error_envelope(
1460                "llm_provider",
1461                Some(reason.code()),
1462                Some(reason),
1463                diagnostic,
1464                None,
1465            )),
1466        });
1467        self.finish(outcome);
1468        true
1469    }
1470
1471    fn llm_response_text<'a>(&self, llm_response: &'a LlmResponse) -> &'a str {
1472        &llm_response.full_text
1473    }
1474
1475    fn llm_response_debug_parts(&self, llm_response: &LlmResponse) -> Option<Value> {
1476        let parts = llm_response
1477            .parts
1478            .iter()
1479            .filter_map(|part| match part {
1480                LlmOutputPart::Text { text, .. } if !text.is_empty() => Some(serde_json::json!({
1481                    "type": "text",
1482                    "text": text,
1483                })),
1484                LlmOutputPart::Text { .. } => None,
1485                LlmOutputPart::Reasoning {
1486                    text,
1487                    replay,
1488                } => Some(serde_json::json!({
1489                    "type": "reasoning",
1490                    "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1491                    "summary": replay.as_ref().map(|meta| &meta.summary),
1492                    "text": text,
1493                    "has_encrypted": replay.as_ref().is_some_and(|meta| meta.encrypted_content.is_some() || meta.signature.is_some()),
1494                    "redacted": replay.as_ref().is_some_and(|meta| meta.redacted),
1495                })),
1496                LlmOutputPart::ToolCall {
1497                    call_id,
1498                    tool_name,
1499                    input_json,
1500                    replay,
1501                } => Some(serde_json::json!({
1502                    "type": "tool_call",
1503                    "call_id": call_id,
1504                    "tool_name": tool_name,
1505                    "input_json": input_json,
1506                    "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1507                    "has_opaque": replay.as_ref().is_some_and(|meta| meta.opaque.is_some()),
1508                })),
1509            })
1510            .collect::<Vec<_>>();
1511        (!parts.is_empty()).then_some(Value::Array(parts))
1512    }
1513
1514    fn record_llm_usage(&mut self, llm_response: &LlmResponse, response_text: &str) {
1515        let usage = token_usage_from_llm_usage(&llm_response.usage);
1516        self.cumulative_usage.add(&usage);
1517        self.emit(SessionEvent::TokenUsage {
1518            protocol_iteration: self.protocol_iteration,
1519            usage: usage.clone(),
1520            cumulative: self.cumulative_usage.clone(),
1521        });
1522        if self.config.emit_llm_trace {
1523            let response_parts = self.llm_response_debug_parts(llm_response);
1524            self.pending_effects.push_back(Effect::Log {
1525                event: LogEvent::LlmDebug {
1526                    session_id: self.config.session_id.clone(),
1527                    protocol_iteration: self.protocol_iteration,
1528                    usage,
1529                    provider_usage: llm_response.provider_usage.clone(),
1530                    request_body: llm_response.request_body.clone(),
1531                    response_text: response_text.to_string(),
1532                    response_parts,
1533                },
1534            });
1535        }
1536    }
1537
1538    fn record_llm_error(&mut self, error: &LlmCallError) {
1539        if self.config.emit_llm_trace {
1540            self.pending_effects.push_back(Effect::Log {
1541                event: LogEvent::LlmError {
1542                    session_id: self.config.session_id.clone(),
1543                    protocol_iteration: self.protocol_iteration,
1544                    request_body: error.request_body.clone(),
1545                    message: error.message.clone(),
1546                    retryable: error.retryable,
1547                    raw: error.raw.clone(),
1548                    code: error.code.clone(),
1549                    terminal_reason: error.terminal_reason,
1550                },
1551            });
1552        }
1553    }
1554
1555    fn emit_llm_error(&mut self, error: LlmCallError) {
1556        self.record_llm_error(&error);
1557        self.emit(SessionEvent::Error {
1558            message: format!("LLM error: {}", error.message),
1559            envelope: Some(crate::session_model::make_error_envelope(
1560                "llm_provider",
1561                error.code.as_deref(),
1562                Some(error.terminal_reason),
1563                format!("LLM error: {}", error.message),
1564                error.raw,
1565            )),
1566        });
1567        self.finish(TurnOutcome::Stopped(TurnStop::ProviderError));
1568    }
1569
1570    fn handle_tool_results(&mut self, id: EffectId, completed: Vec<CompletedToolCall>) {
1571        let (waiting_effect_id, waiting_calls) =
1572            match std::mem::replace(&mut self.state, MachineState::Finished) {
1573                MachineState::WaitingTools { effect_id, calls } => (effect_id, calls),
1574                other => {
1575                    self.state = other;
1576                    return;
1577                }
1578            };
1579
1580        if waiting_effect_id != id {
1581            self.state = MachineState::WaitingTools {
1582                effect_id: waiting_effect_id,
1583                calls: waiting_calls,
1584            };
1585            return;
1586        }
1587
1588        for outcome in &completed {
1589            self.emit(SessionEvent::ToolCall {
1590                call_id: Some(outcome.call_id.clone()),
1591                name: outcome.tool_name.clone(),
1592                args: outcome.args.clone(),
1593                output: outcome.output.clone(),
1594                duration_ms: outcome.duration_ms,
1595            });
1596        }
1597
1598        let actions = {
1599            let driver = Arc::clone(&self.config.protocol_driver);
1600            let ctx = self.driver_context();
1601            driver.handle_tool_results(ctx, completed)
1602        };
1603        self.apply_actions(actions);
1604    }
1605
1606    fn take_waiting_exec_state(&mut self, id: EffectId) -> Option<WaitingExecState<M>> {
1607        match std::mem::replace(&mut self.state, MachineState::Finished) {
1608            MachineState::WaitingExec {
1609                effect_id,
1610                code: _,
1611                driver_state,
1612            } if effect_id == id => Some(WaitingExecState { driver_state }),
1613            other => {
1614                self.state = other;
1615                None
1616            }
1617        }
1618    }
1619
1620    fn handle_exec_result(&mut self, id: EffectId, result: Result<crate::ExecResponse, String>) {
1621        let Some(waiting) = self.take_waiting_exec_state(id) else {
1622            return;
1623        };
1624        let actions = {
1625            let driver = Arc::clone(&self.config.protocol_driver);
1626            let ctx = self.driver_context();
1627            driver.handle_exec_result(ctx, waiting, result)
1628        };
1629        self.apply_actions(actions);
1630    }
1631}
1632
1633fn token_usage_from_llm_usage(usage: &crate::llm::types::LlmUsage) -> TokenUsage {
1634    TokenUsage {
1635        input_tokens: usage.input_tokens,
1636        output_tokens: usage.output_tokens,
1637        cached_input_tokens: usage.cached_input_tokens,
1638        reasoning_tokens: usage.reasoning_tokens,
1639    }
1640}
1641
1642/// Reclassify a zero-output `OutputLimit` terminal reason as `ContextOverflow`
1643/// when the prompt nearly filled the model's context window.
1644///
1645/// Pure policy: the kernel owns the terminal-reason interpretation, so the
1646/// provider's raw reason is refined here (before it drives the finish decision
1647/// in `handle_terminal_llm_response`) rather than in the host I/O layer. A
1648/// `None` window disables the refinement.
1649fn refine_terminal_reason_for_context_window(
1650    response: &mut LlmResponse,
1651    max_context_tokens: Option<usize>,
1652) {
1653    if response.terminal_reason != LlmTerminalReason::OutputLimit {
1654        return;
1655    }
1656    if response.usage.output_tokens != 0 {
1657        return;
1658    }
1659    let Some(max_context_tokens) = max_context_tokens.filter(|value| *value > 0) else {
1660        return;
1661    };
1662    let prompt_tokens = response
1663        .usage
1664        .input_tokens
1665        .saturating_add(response.usage.cached_input_tokens)
1666        .max(0) as usize;
1667    if prompt_tokens >= max_context_tokens.saturating_mul(95) / 100 {
1668        response.terminal_reason = LlmTerminalReason::ContextOverflow;
1669        response.terminal_diagnostic = Some(
1670            "Model produced no output because the prompt reached the configured context window."
1671                .to_string(),
1672        );
1673    }
1674}
1675
1676#[cfg(test)]
1677mod tests;