Skip to main content

lash_sansio/sansio/
mod.rs

1//! Sans-IO state machine for session turns.
2//!
3//! `TurnMachine` owns the generic effect engine. Protocol-specific behavior
4//! lives behind `ProtocolDriverHandle`, which returns declarative
5//! `DriverAction`s that the machine applies.
6
7use std::collections::{HashSet, VecDeque};
8use std::fmt::Debug;
9use std::sync::Arc;
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use serde_json::Value;
14
15use crate::llm::types::{
16    LlmAttachment, LlmOutputPart, LlmRequest, LlmResponse, LlmTerminalReason, LlmToolChoice,
17    LlmToolSpec, ProviderReplayMeta,
18};
19use crate::session_model::message::MessageOrigin;
20use crate::session_model::{
21    Message, MessageRole, MessageSequence, Part, PartKind, PruneState, SessionEvent,
22    SessionEventRecord, TokenUsage, ToolEvent, TurnTerminationPolicyState, make_error_event,
23    reassign_part_ids, render_prompt,
24};
25use crate::{
26    CheckpointKind, ModelToolReturn, PluginMessage, ToolCallOutput, TurnOutcome, TurnStop,
27};
28
29// ─── Public types ───
30
31pub trait TurnProtocol: Send + Sync + 'static {
32    type Event: Clone + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
33    type Termination: Clone + Default + Debug + Send + Sync + 'static;
34    type DriverState: Clone + Default + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
35}
36
37#[derive(Clone, Debug, Serialize, serde::Deserialize)]
38pub struct UnitTurnProtocol;
39
40impl TurnProtocol for UnitTurnProtocol {
41    type Event = ();
42    type Termination = ();
43    type DriverState = serde_json::Value;
44}
45
46/// Opaque identifier linking an effect to its response.
47#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
48pub struct EffectId(pub u64);
49
50#[derive(Clone, Debug, Serialize, serde::Deserialize)]
51pub struct PendingToolCall {
52    pub call_id: String,
53    pub tool_name: String,
54    pub args: Value,
55    /// Opaque provider replay state carried through for the next request.
56    pub replay: Option<ProviderReplayMeta>,
57}
58
59#[derive(Clone, Debug, Serialize, serde::Deserialize)]
60pub struct CompletedToolCall {
61    pub call_id: String,
62    pub tool_name: String,
63    pub args: Value,
64    pub output: ToolCallOutput,
65    pub model_return: ModelToolReturn,
66    pub duration_ms: u64,
67    /// See [`PendingToolCall::replay`].
68    pub replay: Option<ProviderReplayMeta>,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
72pub struct TurnCause {
73    pub id: String,
74    pub event_type: String,
75    pub origin: MessageOrigin,
76    pub text: String,
77}
78
79impl TurnCause {
80    pub fn to_event_message(&self) -> Message {
81        Message {
82            id: self.id.clone(),
83            role: MessageRole::Event,
84            parts: Arc::new(vec![Part {
85                id: format!("{}.p0", self.id),
86                kind: PartKind::Text,
87                content: self.text.clone(),
88                attachment: None,
89                tool_call_id: None,
90                tool_name: None,
91                tool_replay: None,
92                prune_state: PruneState::Intact,
93                reasoning_meta: None,
94                response_meta: None,
95            }]),
96            origin: Some(self.origin.clone()),
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
102pub struct CheckpointDelivery {
103    pub messages: Vec<PluginMessage>,
104    pub transient_messages: Vec<PluginMessage>,
105    pub turn_causes: Vec<TurnCause>,
106}
107
108pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
109    if causes.is_empty() {
110        return None;
111    }
112
113    let mut rendered = String::from("=== TURN EVENTS ===");
114    for (index, cause) in causes.iter().enumerate() {
115        rendered.push_str("\n\n");
116        rendered.push_str(&format!(
117            "--- event[{index}] · {} · {} ---\n",
118            cause.event_type, cause.id
119        ));
120        rendered.push_str("Origin: ");
121        rendered.push_str(&render_message_origin(&cause.origin));
122        rendered.push_str("\n\n");
123        rendered.push_str(cause.text.trim());
124    }
125    Some(rendered)
126}
127
128fn render_message_origin(origin: &MessageOrigin) -> String {
129    match origin {
130        MessageOrigin::Plugin {
131            plugin_id,
132            transient,
133        } => {
134            if *transient {
135                format!("plugin {plugin_id} (transient)")
136            } else {
137                format!("plugin {plugin_id}")
138            }
139        }
140        MessageOrigin::Process {
141            process_id,
142            event_type,
143            sequence,
144            wake_id,
145            ..
146        } => match wake_id {
147            Some(wake_id) => {
148                format!("process {process_id} {event_type} #{sequence} ({wake_id})")
149            }
150            None => format!("process {process_id} {event_type} #{sequence}"),
151        },
152    }
153}
154
155#[derive(Clone, Debug, Serialize, serde::Deserialize)]
156pub enum LogEvent {
157    LlmDebug {
158        session_id: String,
159        protocol_iteration: usize,
160        usage: TokenUsage,
161        provider_usage: Option<Value>,
162        request_body: Option<String>,
163        response_text: String,
164        response_parts: Option<Value>,
165    },
166    LlmError {
167        session_id: String,
168        protocol_iteration: usize,
169        request_body: Option<String>,
170        message: String,
171        retryable: bool,
172        raw: Option<String>,
173        code: Option<String>,
174        terminal_reason: LlmTerminalReason,
175    },
176}
177
178/// An effect the host must fulfil.
179//
180// `Clone` is implemented by hand below rather than derived: the derive would
181// demand `M: Clone`, but only `M::Event` is ever cloned (and `TurnProtocol`
182// already guarantees `Event: Clone`), so a manual impl keeps `Effect<M>`
183// cloneable for every protocol — which the turn checkpoint relies on.
184#[derive(Debug, Serialize, serde::Deserialize)]
185#[allow(clippy::large_enum_variant)]
186pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
187    /// Sync the live execution surface before the turn proceeds.
188    ///
189    /// `update_machine_config` is only needed after the turn has
190    /// already advanced at least once and the host may need to swap in
191    /// a refreshed system prompt or tool schema for the next
192    /// protocol iteration. Initial syncs are host-only because the machine was
193    /// already constructed from a fresh execution surface.
194    SyncExecutionSurface {
195        id: EffectId,
196        update_machine_config: bool,
197    },
198    /// Start an LLM call.
199    LlmCall {
200        id: EffectId,
201        request: Arc<LlmRequest>,
202    },
203    /// Cancel an in-progress LLM stream.
204    CancelLlm { id: EffectId },
205    /// Execute one or more driver-scheduled tool calls.
206    ToolCalls {
207        id: EffectId,
208        calls: Vec<PendingToolCall>,
209    },
210    /// Execute a protocol-owned code block.
211    ExecCode { id: EffectId, code: String },
212    /// Run a host/plugin checkpoint before the machine continues or completes.
213    Checkpoint {
214        id: EffectId,
215        checkpoint: CheckpointKind,
216    },
217    /// Host-implemented fire-and-forget logging.
218    Log { event: LogEvent },
219    /// Fire-and-forget event (no response needed).
220    Emit(SessionEvent),
221    /// Prompt-history progress that may be durably persisted by the host.
222    ///
223    /// This is separate from [`SessionEvent`]: UI stream events can be partial,
224    /// duplicated, or display-only, while `Progress` is emitted only after the
225    /// state machine has applied semantic message or protocol-step changes.
226    Progress {
227        messages: MessageSequence,
228        event_delta: Vec<SessionEventRecord<M::Event>>,
229        protocol_iteration: usize,
230    },
231    /// Turn is done.
232    Done {
233        messages: MessageSequence,
234        event_delta: Vec<SessionEventRecord<M::Event>>,
235        protocol_iteration: usize,
236    },
237}
238
239impl<M: TurnProtocol> Clone for Effect<M> {
240    fn clone(&self) -> Self {
241        match self {
242            Self::SyncExecutionSurface {
243                id,
244                update_machine_config,
245            } => Self::SyncExecutionSurface {
246                id: *id,
247                update_machine_config: *update_machine_config,
248            },
249            Self::LlmCall { id, request } => Self::LlmCall {
250                id: *id,
251                request: Arc::clone(request),
252            },
253            Self::CancelLlm { id } => Self::CancelLlm { id: *id },
254            Self::ToolCalls { id, calls } => Self::ToolCalls {
255                id: *id,
256                calls: calls.clone(),
257            },
258            Self::ExecCode { id, code } => Self::ExecCode {
259                id: *id,
260                code: code.clone(),
261            },
262            Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
263                id: *id,
264                checkpoint: *checkpoint,
265            },
266            Self::Log { event } => Self::Log {
267                event: event.clone(),
268            },
269            Self::Emit(event) => Self::Emit(event.clone()),
270            Self::Progress {
271                messages,
272                event_delta,
273                protocol_iteration,
274            } => Self::Progress {
275                messages: messages.clone(),
276                event_delta: event_delta.clone(),
277                protocol_iteration: *protocol_iteration,
278            },
279            Self::Done {
280                messages,
281                event_delta,
282                protocol_iteration,
283            } => Self::Done {
284                messages: messages.clone(),
285                event_delta: event_delta.clone(),
286                protocol_iteration: *protocol_iteration,
287            },
288        }
289    }
290}
291
292impl<M: TurnProtocol> Effect<M> {
293    fn id(&self) -> Option<EffectId> {
294        match self {
295            Self::SyncExecutionSurface { id, .. }
296            | Self::LlmCall { id, .. }
297            | Self::CancelLlm { id }
298            | Self::ToolCalls { id, .. }
299            | Self::ExecCode { id, .. }
300            | Self::Checkpoint { id, .. } => Some(*id),
301            Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
302        }
303    }
304}
305
306/// Error details from a failed LLM call.
307#[derive(Clone, Debug, Serialize, serde::Deserialize)]
308pub struct LlmCallError {
309    pub message: String,
310    pub retryable: bool,
311    pub raw: Option<String>,
312    pub code: Option<String>,
313    pub terminal_reason: LlmTerminalReason,
314    pub request_body: Option<String>,
315}
316
317/// A response to a previously emitted effect.
318pub enum Response {
319    /// Live execution surface sync completed.
320    ExecutionSurfaceSynced {
321        id: EffectId,
322        result: Result<Option<ExecutionSurfaceSync>, String>,
323    },
324    /// Full LLM response.
325    LlmComplete {
326        id: EffectId,
327        result: Result<LlmResponse, LlmCallError>,
328        /// When true, text deltas were already emitted during streaming,
329        /// so the driver should skip emitting `TextDelta` events.
330        text_streamed: bool,
331    },
332    /// Native tool results.
333    ToolResults {
334        id: EffectId,
335        results: Vec<CompletedToolCall>,
336    },
337    /// Mode code execution result.
338    ExecResult {
339        id: EffectId,
340        result: Result<crate::ExecResponse, String>,
341    },
342    /// Checkpoint result with optional injected messages.
343    Checkpoint {
344        id: EffectId,
345        delivery: CheckpointDelivery,
346    },
347}
348
349#[derive(Clone, Debug, Serialize, serde::Deserialize)]
350pub struct ExecutionSurfaceSync {
351    pub system_prompt: Arc<str>,
352    pub tool_specs: Arc<Vec<LlmToolSpec>>,
353}
354
355pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
356    pub request: Arc<LlmRequest>,
357    driver_state: Option<M::DriverState>,
358}
359
360impl<M: TurnProtocol> WaitingLlmState<M> {
361    pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
362        self.driver_state.take()
363    }
364}
365
366pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
367    driver_state: M::DriverState,
368}
369
370impl<M: TurnProtocol> WaitingExecState<M> {
371    pub fn into_driver_state(self) -> M::DriverState {
372        self.driver_state
373    }
374}
375
376#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
377pub enum CheckpointResumeAction {
378    PrepareIteration,
379    Finish(TurnOutcome),
380}
381
382#[allow(clippy::large_enum_variant)]
383pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
384    Emit(SessionEvent),
385    AppendEvents(Vec<SessionEventRecord<M::Event>>),
386    StartLlm {
387        request: Arc<LlmRequest>,
388        driver_state: Option<M::DriverState>,
389    },
390    StartTools {
391        calls: Vec<PendingToolCall>,
392    },
393    StartExec {
394        code: String,
395        driver_state: M::DriverState,
396    },
397    StartCheckpoint {
398        checkpoint: CheckpointKind,
399        on_empty: CheckpointResumeAction,
400    },
401    AdvanceProtocolIteration,
402    ScheduleTurnLimitFinal {
403        message: Message,
404    },
405    Finish(TurnOutcome),
406}
407
408pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
409    config: &'a TurnMachineConfig<M>,
410    messages: &'a MessageSequence,
411    events: &'a [SessionEventRecord<M::Event>],
412    turn_causes: &'a [TurnCause],
413    protocol_iteration: usize,
414    protocol_run_offset: usize,
415    termination: &'a TurnTerminationPolicyState,
416}
417
418impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
419    pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
420        self.config.projector.project(ProjectorContext {
421            config: self.config,
422            messages: self.messages,
423            events: self.events,
424            turn_causes: self.turn_causes,
425            protocol_iteration: self.protocol_iteration,
426            use_tools,
427        })
428    }
429
430    pub fn protocol_iteration(&self) -> usize {
431        self.protocol_iteration
432    }
433
434    pub fn protocol_run_offset(&self) -> usize {
435        self.protocol_run_offset
436    }
437
438    pub fn max_turns(&self) -> Option<usize> {
439        self.config.max_turns
440    }
441
442    pub fn termination(&self) -> &M::Termination {
443        &self.config.termination
444    }
445
446    pub fn autonomous(&self) -> bool {
447        self.config.autonomous
448    }
449
450    pub fn should_force_exit_after_grace_turn(&self) -> bool {
451        self.termination.should_force_exit_after_grace_turn()
452    }
453
454    pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
455        self.termination.turn_limit_final_to_schedule(
456            self.protocol_iteration,
457            self.protocol_run_offset,
458            self.config.max_turns,
459        )
460    }
461
462    pub fn messages(&self) -> &MessageSequence {
463        self.messages
464    }
465
466    pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
467        self.events
468    }
469
470    pub fn turn_causes(&self) -> &[TurnCause] {
471        self.turn_causes
472    }
473}
474
475pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
476    pub config: &'a TurnMachineConfig<M>,
477    pub messages: &'a MessageSequence,
478    pub events: &'a [SessionEventRecord<M::Event>],
479    pub turn_causes: &'a [TurnCause],
480    pub protocol_iteration: usize,
481    pub use_tools: bool,
482}
483
484pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
485    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
486}
487
488#[derive(Clone, Debug, Default)]
489pub struct ChatContextProjector;
490
491impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
492    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
493        let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
494        let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
495        let mut messages = rendered_prompt.messages;
496        if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
497            messages.push(crate::llm::types::LlmMessage::text(
498                crate::llm::types::LlmRole::User,
499                Arc::from(turn_events),
500            ));
501        }
502        if !ctx.config.system_prompt.trim().is_empty() {
503            messages.insert(
504                0,
505                crate::llm::types::LlmMessage::text(
506                    crate::llm::types::LlmRole::System,
507                    Arc::clone(&ctx.config.system_prompt),
508                ),
509            );
510        }
511
512        Arc::new(LlmRequest {
513            model: ctx.config.model.clone(),
514            messages,
515            attachments,
516            tools: if ctx.use_tools {
517                Arc::clone(&ctx.config.tool_specs)
518            } else {
519                Arc::new(Vec::new())
520            },
521            tool_choice: if ctx.use_tools {
522                LlmToolChoice::Auto
523            } else {
524                LlmToolChoice::None
525            },
526            model_variant: ctx.config.model_variant.clone(),
527            generation: ctx.config.generation.clone(),
528            session_id: ctx.config.run_session_id.clone(),
529            output_spec: None,
530            stream_events: None,
531            provider_trace: None,
532        })
533    }
534}
535
536fn render_messages_for_projector(
537    messages: &MessageSequence,
538    turn_causes: &[TurnCause],
539) -> crate::RenderedPrompt {
540    if turn_causes.is_empty() {
541        return messages.render_prompt();
542    }
543
544    let active_cause_ids = turn_causes
545        .iter()
546        .map(|cause| cause.id.as_str())
547        .collect::<HashSet<_>>();
548    let filtered = messages
549        .iter()
550        .filter(|message| {
551            !(matches!(message.role, MessageRole::Event)
552                && active_cause_ids.contains(message.id.as_str()))
553        })
554        .cloned()
555        .collect::<Vec<_>>();
556    render_prompt(filtered.as_slice())
557}
558
559pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
560    fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
561    fn handle_llm_success(
562        &self,
563        ctx: DriverContextView<'_, M>,
564        waiting: WaitingLlmState<M>,
565        llm_response: LlmResponse,
566        text_streamed: bool,
567    ) -> Vec<DriverAction<M>>;
568    fn handle_tool_results(
569        &self,
570        ctx: DriverContextView<'_, M>,
571        completed: Vec<CompletedToolCall>,
572    ) -> Vec<DriverAction<M>>;
573    fn handle_exec_result(
574        &self,
575        ctx: DriverContextView<'_, M>,
576        waiting: WaitingExecState<M>,
577        result: Result<crate::ExecResponse, String>,
578    ) -> Vec<DriverAction<M>>;
579}
580
581/// Configuration for a `TurnMachine` instance.
582pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
583    pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
584    pub projector: Arc<dyn ContextProjector<M>>,
585    pub sync_execution_surface: bool,
586    pub model: String,
587    /// Model context-window size in tokens, if known. Lets the kernel
588    /// reclassify a zero-output `OutputLimit` terminal reason as
589    /// `ContextOverflow` when the prompt nearly filled the window. `None`
590    /// disables that refinement.
591    pub max_context_tokens: Option<usize>,
592    pub max_turns: Option<usize>,
593    pub model_variant: Option<String>,
594    pub generation: crate::llm::types::GenerationOptions,
595    pub run_session_id: Option<String>,
596    pub autonomous: bool,
597    pub tool_specs: Arc<Vec<LlmToolSpec>>,
598    pub system_prompt: Arc<str>,
599    pub session_id: String,
600    pub emit_llm_trace: bool,
601    pub termination: M::Termination,
602    pub turn_limit_final_message: crate::TurnLimitFinalMessage,
603}
604
605// ─── Internal state ───
606
607#[derive(Debug, Serialize, serde::Deserialize)]
608enum MachineState<M: TurnProtocol = UnitTurnProtocol> {
609    PreparingProtocol,
610    WaitingExecutionSurface {
611        effect_id: EffectId,
612        update_machine_config: bool,
613    },
614    PrepareIteration,
615    WaitingLlm {
616        effect_id: EffectId,
617        request: Arc<LlmRequest>,
618        driver_state: Option<M::DriverState>,
619    },
620    WaitingTools {
621        effect_id: EffectId,
622        calls: Vec<PendingToolCall>,
623    },
624    WaitingExec {
625        effect_id: EffectId,
626        code: String,
627        driver_state: M::DriverState,
628    },
629    WaitingCheckpoint {
630        effect_id: EffectId,
631        checkpoint: CheckpointKind,
632        on_empty: CheckpointResumeAction,
633    },
634    Finished,
635}
636
637#[derive(Clone, Debug, Serialize, serde::Deserialize)]
638pub struct TurnCheckpoint<M: TurnProtocol = UnitTurnProtocol> {
639    state: MachineState<M>,
640    pending_effects: Vec<Effect<M>>,
641    next_effect_id: u64,
642    #[serde(default)]
643    next_synthetic_message_id: u64,
644    messages: Vec<Message>,
645    events: Vec<SessionEventRecord<M::Event>>,
646    #[serde(default)]
647    turn_causes: Vec<TurnCause>,
648    #[serde(default)]
649    progress_event_cursor: usize,
650    protocol_iteration: usize,
651    protocol_run_offset: usize,
652    cumulative_usage: TokenUsage,
653    termination: TurnTerminationPolicyState,
654    synced_protocol_iteration: Option<usize>,
655}
656
657impl<M: TurnProtocol> Clone for MachineState<M> {
658    fn clone(&self) -> Self {
659        match self {
660            Self::PreparingProtocol => Self::PreparingProtocol,
661            Self::WaitingExecutionSurface {
662                effect_id,
663                update_machine_config,
664            } => Self::WaitingExecutionSurface {
665                effect_id: *effect_id,
666                update_machine_config: *update_machine_config,
667            },
668            Self::PrepareIteration => Self::PrepareIteration,
669            Self::WaitingLlm {
670                effect_id,
671                request,
672                driver_state,
673            } => Self::WaitingLlm {
674                effect_id: *effect_id,
675                request: Arc::clone(request),
676                driver_state: driver_state.clone(),
677            },
678            Self::WaitingTools { effect_id, calls } => Self::WaitingTools {
679                effect_id: *effect_id,
680                calls: calls.clone(),
681            },
682            Self::WaitingExec {
683                effect_id,
684                code,
685                driver_state,
686            } => Self::WaitingExec {
687                effect_id: *effect_id,
688                code: code.clone(),
689                driver_state: driver_state.clone(),
690            },
691            Self::WaitingCheckpoint {
692                effect_id,
693                checkpoint,
694                on_empty,
695            } => Self::WaitingCheckpoint {
696                effect_id: *effect_id,
697                checkpoint: *checkpoint,
698                on_empty: on_empty.clone(),
699            },
700            Self::Finished => Self::Finished,
701        }
702    }
703}
704
705impl<M: TurnProtocol> MachineState<M> {
706    fn outstanding_effect_id(&self) -> Option<EffectId> {
707        match self {
708            Self::WaitingExecutionSurface { effect_id, .. }
709            | Self::WaitingLlm { effect_id, .. }
710            | Self::WaitingTools { effect_id, .. }
711            | Self::WaitingExec { effect_id, .. }
712            | Self::WaitingCheckpoint { effect_id, .. } => Some(*effect_id),
713            Self::PreparingProtocol | Self::PrepareIteration | Self::Finished => None,
714        }
715    }
716
717    fn outstanding_effect(&self) -> Option<Effect<M>> {
718        match self {
719            Self::WaitingExecutionSurface {
720                effect_id,
721                update_machine_config,
722            } => Some(Effect::SyncExecutionSurface {
723                id: *effect_id,
724                update_machine_config: *update_machine_config,
725            }),
726            Self::WaitingLlm {
727                effect_id, request, ..
728            } => Some(Effect::LlmCall {
729                id: *effect_id,
730                request: Arc::clone(request),
731            }),
732            Self::WaitingTools { effect_id, calls } => Some(Effect::ToolCalls {
733                id: *effect_id,
734                calls: calls.clone(),
735            }),
736            Self::WaitingExec {
737                effect_id, code, ..
738            } => Some(Effect::ExecCode {
739                id: *effect_id,
740                code: code.clone(),
741            }),
742            Self::WaitingCheckpoint {
743                effect_id,
744                checkpoint,
745                ..
746            } => Some(Effect::Checkpoint {
747                id: *effect_id,
748                checkpoint: *checkpoint,
749            }),
750            Self::PreparingProtocol | Self::PrepareIteration | Self::Finished => None,
751        }
752    }
753}
754
755/// Sans-IO state machine for a single session run (multi-turn).
756pub struct TurnMachine<M: TurnProtocol = UnitTurnProtocol> {
757    config: TurnMachineConfig<M>,
758    state: MachineState<M>,
759    pending_effects: VecDeque<Effect<M>>,
760    active_effect_redelivery: bool,
761    next_effect_id: u64,
762    next_synthetic_message_id: u64,
763    messages: MessageSequence,
764    events: Arc<Vec<SessionEventRecord<M::Event>>>,
765    turn_causes: Vec<TurnCause>,
766    progress_event_cursor: usize,
767    protocol_iteration: usize,
768    protocol_run_offset: usize,
769    cumulative_usage: TokenUsage,
770    termination: TurnTerminationPolicyState,
771    synced_protocol_iteration: Option<usize>,
772}
773
774impl<M: TurnProtocol> TurnMachine<M> {
775    /// Create a new machine in `PrepareIteration` state.
776    pub fn new(
777        config: TurnMachineConfig<M>,
778        messages: Vec<Message>,
779        events: Arc<Vec<SessionEventRecord<M::Event>>>,
780        protocol_run_offset: usize,
781    ) -> Self {
782        Self::new_shared(
783            config,
784            MessageSequence::from_owned(messages),
785            events,
786            protocol_run_offset,
787        )
788    }
789
790    pub fn new_shared(
791        config: TurnMachineConfig<M>,
792        messages: MessageSequence,
793        events: Arc<Vec<SessionEventRecord<M::Event>>>,
794        protocol_run_offset: usize,
795    ) -> Self {
796        Self::new_shared_with_turn_causes(config, messages, events, protocol_run_offset, Vec::new())
797    }
798
799    pub fn new_shared_with_turn_causes(
800        config: TurnMachineConfig<M>,
801        messages: MessageSequence,
802        events: Arc<Vec<SessionEventRecord<M::Event>>>,
803        protocol_run_offset: usize,
804        turn_causes: Vec<TurnCause>,
805    ) -> Self {
806        let next_synthetic_message_id = messages.len() as u64;
807        Self {
808            config,
809            state: MachineState::PreparingProtocol,
810            pending_effects: VecDeque::new(),
811            active_effect_redelivery: false,
812            next_effect_id: 1,
813            next_synthetic_message_id,
814            messages,
815            progress_event_cursor: events.len(),
816            events,
817            turn_causes,
818            protocol_iteration: protocol_run_offset,
819            protocol_run_offset,
820            cumulative_usage: TokenUsage::default(),
821            termination: TurnTerminationPolicyState::new(),
822            synced_protocol_iteration: None,
823        }
824    }
825
826    /// Whether the machine has finished.
827    pub fn is_done(&self) -> bool {
828        matches!(self.state, MachineState::Finished)
829    }
830
831    pub fn messages(&self) -> Arc<Vec<Message>> {
832        self.messages.shared()
833    }
834
835    pub fn events(&self) -> Arc<Vec<SessionEventRecord<M::Event>>> {
836        Arc::clone(&self.events)
837    }
838
839    pub fn message_sequence(&self) -> MessageSequence {
840        self.messages.clone()
841    }
842
843    pub fn protocol_iteration(&self) -> usize {
844        self.protocol_iteration
845    }
846
847    pub fn checkpoint(&self) -> TurnCheckpoint<M> {
848        let active_effect_id = self.state.outstanding_effect_id();
849        let pending_effects = self
850            .pending_effects
851            .iter()
852            .filter(|effect| active_effect_id.is_none_or(|id| effect.id() != Some(id)))
853            .cloned()
854            .collect::<Vec<_>>();
855        TurnCheckpoint {
856            state: self.state.clone(),
857            pending_effects,
858            next_effect_id: self.next_effect_id,
859            next_synthetic_message_id: self.next_synthetic_message_id,
860            messages: self.messages.iter().cloned().collect(),
861            events: self.events.as_ref().clone(),
862            turn_causes: self.turn_causes.clone(),
863            progress_event_cursor: self.progress_event_cursor,
864            protocol_iteration: self.protocol_iteration,
865            protocol_run_offset: self.protocol_run_offset,
866            cumulative_usage: self.cumulative_usage.clone(),
867            termination: self.termination.clone(),
868            synced_protocol_iteration: self.synced_protocol_iteration,
869        }
870    }
871
872    pub fn restore_from_checkpoint(
873        config: TurnMachineConfig<M>,
874        checkpoint: TurnCheckpoint<M>,
875    ) -> Self {
876        let active_effect_id = checkpoint.state.outstanding_effect_id();
877        let pending_effects = checkpoint
878            .pending_effects
879            .into_iter()
880            .collect::<VecDeque<_>>();
881        let active_effect_redelivery = active_effect_id.is_some()
882            && !pending_effects
883                .iter()
884                .any(|effect| effect.id() == active_effect_id);
885        Self {
886            config,
887            state: checkpoint.state,
888            pending_effects,
889            active_effect_redelivery,
890            next_effect_id: checkpoint.next_effect_id,
891            next_synthetic_message_id: checkpoint.next_synthetic_message_id,
892            messages: MessageSequence::from_owned(checkpoint.messages),
893            events: Arc::new(checkpoint.events),
894            turn_causes: checkpoint.turn_causes,
895            progress_event_cursor: checkpoint.progress_event_cursor,
896            protocol_iteration: checkpoint.protocol_iteration,
897            protocol_run_offset: checkpoint.protocol_run_offset,
898            cumulative_usage: checkpoint.cumulative_usage,
899            termination: checkpoint.termination,
900            synced_protocol_iteration: checkpoint.synced_protocol_iteration,
901        }
902    }
903
904    fn driver_context(&self) -> DriverContextView<'_, M> {
905        DriverContextView {
906            config: &self.config,
907            messages: &self.messages,
908            events: self.events.as_slice(),
909            turn_causes: &self.turn_causes,
910            protocol_iteration: self.protocol_iteration,
911            protocol_run_offset: self.protocol_run_offset,
912            termination: &self.termination,
913        }
914    }
915
916    fn next_id(&mut self) -> EffectId {
917        let id = EffectId(self.next_effect_id);
918        self.next_effect_id += 1;
919        id
920    }
921
922    fn next_synthetic_message_id(&mut self, scope: &str) -> String {
923        let id = format!(
924            "m_sansio_{}_{}_{}",
925            self.protocol_run_offset, scope, self.next_synthetic_message_id
926        );
927        self.next_synthetic_message_id += 1;
928        id
929    }
930
931    fn emit(&mut self, event: SessionEvent) {
932        self.pending_effects.push_back(Effect::Emit(event));
933    }
934
935    fn emit_progress(&mut self) {
936        let event_delta = self.next_event_delta();
937        self.pending_effects.push_back(Effect::Progress {
938            messages: self.messages.clone(),
939            event_delta,
940            protocol_iteration: self.protocol_iteration,
941        });
942    }
943
944    pub fn fail_turn(&mut self, event: SessionEvent) {
945        self.emit(event);
946        self.finish(TurnOutcome::Stopped(TurnStop::RuntimeError));
947    }
948
949    pub fn finish_with_outcome(&mut self, outcome: TurnOutcome) {
950        self.finish(outcome);
951    }
952
953    fn finish(&mut self, outcome: TurnOutcome) {
954        self.emit(SessionEvent::TurnOutcome { outcome });
955        self.emit(SessionEvent::Done);
956        let msgs = std::mem::take(&mut self.messages);
957        let event_delta = self.next_event_delta();
958        let protocol_iteration = self.protocol_iteration;
959        self.state = MachineState::Finished;
960        self.pending_effects.push_back(Effect::Done {
961            messages: msgs,
962            event_delta,
963            protocol_iteration,
964        });
965    }
966
967    fn next_event_delta(&mut self) -> Vec<SessionEventRecord<M::Event>> {
968        if self.progress_event_cursor >= self.events.len() {
969            self.progress_event_cursor = self.events.len();
970            return Vec::new();
971        }
972        let delta = self.events[self.progress_event_cursor..].to_vec();
973        self.progress_event_cursor = self.events.len();
974        delta
975    }
976
977    /// Drain the next pending effect. Returns `None` when the host must call
978    /// `handle_response()` before more effects become available.
979    pub fn poll_effect(&mut self) -> Option<Effect<M>> {
980        if let Some(effect) = self.pending_effects.pop_front() {
981            return Some(effect);
982        }
983        if self.active_effect_redelivery {
984            self.active_effect_redelivery = false;
985            if let Some(effect) = self.state.outstanding_effect() {
986                return Some(effect);
987            }
988        }
989
990        match &self.state {
991            MachineState::PreparingProtocol => {
992                self.prepare_protocol();
993                self.pending_effects.pop_front()
994            }
995            MachineState::PrepareIteration => {
996                self.prepare_protocol_iteration();
997                self.pending_effects.pop_front()
998            }
999            _ => None,
1000        }
1001    }
1002
1003    // ─── State transitions ───
1004
1005    fn prepare_protocol(&mut self) {
1006        if self.config.sync_execution_surface {
1007            let id = self.next_id();
1008            self.state = MachineState::WaitingExecutionSurface {
1009                effect_id: id,
1010                update_machine_config: false,
1011            };
1012            self.pending_effects
1013                .push_back(Effect::SyncExecutionSurface {
1014                    id,
1015                    update_machine_config: false,
1016                });
1017            return;
1018        }
1019
1020        self.prepare_protocol_iteration();
1021    }
1022
1023    fn prepare_protocol_iteration(&mut self) {
1024        if self.config.sync_execution_surface
1025            && self.synced_protocol_iteration != Some(self.protocol_iteration)
1026        {
1027            let id = self.next_id();
1028            self.state = MachineState::WaitingExecutionSurface {
1029                effect_id: id,
1030                update_machine_config: true,
1031            };
1032            self.pending_effects
1033                .push_back(Effect::SyncExecutionSurface {
1034                    id,
1035                    update_machine_config: true,
1036                });
1037            return;
1038        }
1039        let actions = {
1040            let driver = Arc::clone(&self.config.protocol_driver);
1041            let ctx = self.driver_context();
1042            driver.prepare_protocol_iteration(ctx)
1043        };
1044        self.apply_actions(actions);
1045    }
1046
1047    fn start_llm_request(
1048        &mut self,
1049        request: Arc<LlmRequest>,
1050        driver_state: Option<M::DriverState>,
1051    ) {
1052        let tool_list = self
1053            .config
1054            .tool_specs
1055            .iter()
1056            .map(|tool| tool.name.as_str())
1057            .collect::<Vec<_>>()
1058            .join(", ");
1059        self.emit(SessionEvent::LlmRequest {
1060            protocol_iteration: self.protocol_iteration,
1061            message_count: self.messages.len(),
1062            tool_list,
1063        });
1064
1065        let id = self.next_id();
1066        self.state = MachineState::WaitingLlm {
1067            effect_id: id,
1068            request: Arc::clone(&request),
1069            driver_state,
1070        };
1071        self.pending_effects
1072            .push_back(Effect::LlmCall { id, request });
1073    }
1074
1075    fn start_tool_calls(&mut self, calls: Vec<PendingToolCall>) {
1076        let effect_id = self.next_id();
1077        self.state = MachineState::WaitingTools {
1078            effect_id,
1079            calls: calls.clone(),
1080        };
1081        self.pending_effects.push_back(Effect::ToolCalls {
1082            id: effect_id,
1083            calls,
1084        });
1085    }
1086
1087    fn start_exec(&mut self, code: String, driver_state: M::DriverState) {
1088        let effect_id = self.next_id();
1089        self.state = MachineState::WaitingExec {
1090            effect_id,
1091            code: code.clone(),
1092            driver_state,
1093        };
1094        self.pending_effects.push_back(Effect::ExecCode {
1095            id: effect_id,
1096            code,
1097        });
1098    }
1099
1100    fn schedule_turn_limit_final(&mut self, message: Message) -> bool {
1101        let Some(_max_turns) = self.termination.turn_limit_final_to_schedule(
1102            self.protocol_iteration,
1103            self.protocol_run_offset,
1104            self.config.max_turns,
1105        ) else {
1106            return false;
1107        };
1108        self.termination.mark_turn_limit_final_scheduled();
1109        self.messages.push(message);
1110        true
1111    }
1112
1113    fn schedule_configured_turn_limit_final(&mut self) -> bool {
1114        let Some(max_turns) = self.termination.turn_limit_final_to_schedule(
1115            self.protocol_iteration,
1116            self.protocol_run_offset,
1117            self.config.max_turns,
1118        ) else {
1119            return false;
1120        };
1121        let message_id = self.next_synthetic_message_id("turn_limit");
1122        let message = (self.config.turn_limit_final_message)(message_id, max_turns);
1123        self.termination.mark_turn_limit_final_scheduled();
1124        self.messages.push(message);
1125        true
1126    }
1127
1128    fn append_event(&mut self, event: SessionEventRecord<M::Event>) {
1129        match event {
1130            SessionEventRecord::Conversation(record) => {
1131                Arc::make_mut(&mut self.events)
1132                    .push(SessionEventRecord::Conversation(record.clone()));
1133                self.messages.push(record.to_message());
1134            }
1135            SessionEventRecord::Tool(ToolEvent::Invocation { stable_key, record }) => {
1136                Arc::make_mut(&mut self.events).push(SessionEventRecord::Tool(
1137                    ToolEvent::Invocation { stable_key, record },
1138                ));
1139            }
1140            SessionEventRecord::Protocol(protocol_event) => {
1141                Arc::make_mut(&mut self.events).push(SessionEventRecord::Protocol(protocol_event));
1142            }
1143        }
1144    }
1145
1146    pub fn apply_actions(&mut self, actions: Vec<DriverAction<M>>) {
1147        let mut progress_dirty = false;
1148        for action in actions {
1149            match action {
1150                DriverAction::Emit(event) => self.emit(event),
1151                DriverAction::AppendEvents(events) => {
1152                    if !events.is_empty() {
1153                        for event in events {
1154                            self.append_event(event);
1155                        }
1156                        progress_dirty = true;
1157                    }
1158                }
1159                DriverAction::StartLlm {
1160                    request,
1161                    driver_state,
1162                } => self.start_llm_request(request, driver_state),
1163                DriverAction::StartTools { calls } => self.start_tool_calls(calls),
1164                DriverAction::StartExec { code, driver_state } => {
1165                    self.start_exec(code, driver_state)
1166                }
1167                DriverAction::StartCheckpoint {
1168                    checkpoint,
1169                    on_empty,
1170                } => self.request_checkpoint(checkpoint, on_empty),
1171                DriverAction::AdvanceProtocolIteration => {
1172                    self.protocol_iteration += 1;
1173                    self.synced_protocol_iteration = None;
1174                    progress_dirty = true;
1175                }
1176                DriverAction::ScheduleTurnLimitFinal { message } => {
1177                    if self.schedule_turn_limit_final(message) {
1178                        progress_dirty = true;
1179                    }
1180                }
1181                DriverAction::Finish(outcome) => {
1182                    if progress_dirty {
1183                        self.emit_progress();
1184                        progress_dirty = false;
1185                    }
1186                    self.finish(outcome);
1187                    break;
1188                }
1189            }
1190        }
1191        if progress_dirty {
1192            self.emit_progress();
1193        }
1194    }
1195
1196    /// Feed a response to a previously emitted effect.
1197    pub fn handle_response(&mut self, response: Response) {
1198        self.active_effect_redelivery = false;
1199        match response {
1200            Response::ExecutionSurfaceSynced { id, result } => {
1201                self.handle_execution_surface_synced(id, result)
1202            }
1203            Response::LlmComplete {
1204                id,
1205                result,
1206                text_streamed,
1207            } => self.handle_llm_complete(id, result, text_streamed),
1208            Response::ToolResults { id, results } => self.handle_tool_results(id, results),
1209            Response::ExecResult { id, result } => self.handle_exec_result(id, result),
1210            Response::Checkpoint { id, delivery } => self.handle_checkpoint(id, delivery),
1211        }
1212    }
1213
1214    fn request_checkpoint(&mut self, checkpoint: CheckpointKind, on_empty: CheckpointResumeAction) {
1215        let id = self.next_id();
1216        self.state = MachineState::WaitingCheckpoint {
1217            effect_id: id,
1218            checkpoint,
1219            on_empty,
1220        };
1221        self.pending_effects
1222            .push_back(Effect::Checkpoint { id, checkpoint });
1223    }
1224
1225    fn handle_execution_surface_synced(
1226        &mut self,
1227        id: EffectId,
1228        result: Result<Option<ExecutionSurfaceSync>, String>,
1229    ) {
1230        let (waiting_id, waiting_update_machine_config) =
1231            match std::mem::replace(&mut self.state, MachineState::Finished) {
1232                MachineState::WaitingExecutionSurface {
1233                    effect_id,
1234                    update_machine_config,
1235                } => (effect_id, update_machine_config),
1236                other => {
1237                    self.state = other;
1238                    return;
1239                }
1240            };
1241        if waiting_id != id {
1242            self.state = MachineState::WaitingExecutionSurface {
1243                effect_id: waiting_id,
1244                update_machine_config: waiting_update_machine_config,
1245            };
1246            return;
1247        }
1248
1249        match result {
1250            Ok(update) => {
1251                if let Some(update) = update {
1252                    self.config.system_prompt = update.system_prompt;
1253                    self.config.tool_specs = update.tool_specs;
1254                }
1255                self.synced_protocol_iteration = Some(self.protocol_iteration);
1256                self.state = MachineState::PrepareIteration;
1257            }
1258            Err(error) => {
1259                self.fail_turn(make_error_event(
1260                    "execution_surface",
1261                    Some("reconfigure_failed"),
1262                    format!("Failed to refresh execution surface: {error}"),
1263                    Some(error),
1264                ));
1265            }
1266        }
1267    }
1268
1269    fn append_checkpoint_messages(&mut self, plugin_messages: &[PluginMessage], transient: bool) {
1270        let mut appended = Vec::new();
1271        for message in plugin_messages
1272            .iter()
1273            .filter(|message| matches!(message.role, MessageRole::User | MessageRole::System))
1274        {
1275            let message_id = self.next_synthetic_message_id("checkpoint");
1276            let mut parts = if message.parts.is_empty() {
1277                vec![Part {
1278                    id: format!("{message_id}.p0"),
1279                    kind: PartKind::Text,
1280                    content: message.content.clone(),
1281                    attachment: None,
1282                    tool_call_id: None,
1283                    tool_name: None,
1284                    tool_replay: None,
1285                    prune_state: PruneState::Intact,
1286                    reasoning_meta: None,
1287                    response_meta: None,
1288                }]
1289            } else {
1290                message.parts.clone()
1291            };
1292            reassign_part_ids(&message_id, &mut parts);
1293            appended.push(Message {
1294                id: message_id.clone(),
1295                role: message.role,
1296                parts: Arc::new(parts),
1297                origin: message.origin.clone().or_else(|| {
1298                    Some(MessageOrigin::Plugin {
1299                        plugin_id: "plugin".to_string(),
1300                        transient,
1301                    })
1302                }),
1303            });
1304        }
1305        if !appended.is_empty() {
1306            self.messages.extend(appended);
1307        }
1308    }
1309
1310    fn append_turn_causes(&mut self, causes: Vec<TurnCause>) {
1311        if causes.is_empty() {
1312            return;
1313        }
1314        let mut existing_ids = self
1315            .turn_causes
1316            .iter()
1317            .map(|cause| cause.id.clone())
1318            .collect::<HashSet<_>>();
1319        for cause in causes {
1320            if !existing_ids.insert(cause.id.clone()) {
1321                continue;
1322            }
1323            self.messages.push(cause.to_event_message());
1324            self.turn_causes.push(cause);
1325        }
1326    }
1327
1328    fn handle_checkpoint(&mut self, id: EffectId, delivery: CheckpointDelivery) {
1329        let (effect_id, checkpoint, on_empty) =
1330            match std::mem::replace(&mut self.state, MachineState::Finished) {
1331                MachineState::WaitingCheckpoint {
1332                    effect_id,
1333                    checkpoint,
1334                    on_empty,
1335                } => (effect_id, checkpoint, on_empty),
1336                other => {
1337                    self.state = other;
1338                    return;
1339                }
1340            };
1341        if effect_id != id {
1342            self.state = MachineState::WaitingCheckpoint {
1343                effect_id,
1344                checkpoint,
1345                on_empty,
1346            };
1347            return;
1348        }
1349
1350        if !delivery.messages.is_empty()
1351            || !delivery.transient_messages.is_empty()
1352            || !delivery.turn_causes.is_empty()
1353        {
1354            self.append_checkpoint_messages(&delivery.messages, false);
1355            self.append_checkpoint_messages(&delivery.transient_messages, true);
1356            self.append_turn_causes(delivery.turn_causes);
1357            if matches!(checkpoint, CheckpointKind::BeforeCompletion) {
1358                self.protocol_iteration += 1;
1359                if self.termination.should_force_exit_after_grace_turn() {
1360                    self.emit_progress();
1361                    self.finish(TurnOutcome::Stopped(TurnStop::MaxTurns));
1362                    return;
1363                }
1364                self.schedule_configured_turn_limit_final();
1365            }
1366            self.state = MachineState::PrepareIteration;
1367            self.emit_progress();
1368            return;
1369        }
1370
1371        match on_empty {
1372            CheckpointResumeAction::PrepareIteration => {
1373                self.state = MachineState::PrepareIteration;
1374            }
1375            CheckpointResumeAction::Finish(outcome) => self.finish(outcome),
1376        }
1377    }
1378
1379    fn take_waiting_llm_state(&mut self, id: EffectId) -> Option<WaitingLlmState<M>> {
1380        match std::mem::replace(&mut self.state, MachineState::Finished) {
1381            MachineState::WaitingLlm {
1382                effect_id,
1383                request,
1384                driver_state,
1385            } if effect_id == id => Some(WaitingLlmState {
1386                request,
1387                driver_state,
1388            }),
1389            other => {
1390                self.state = other;
1391                None
1392            }
1393        }
1394    }
1395
1396    fn handle_llm_complete(
1397        &mut self,
1398        id: EffectId,
1399        result: Result<LlmResponse, LlmCallError>,
1400        text_streamed: bool,
1401    ) {
1402        let Some(waiting) = self.take_waiting_llm_state(id) else {
1403            return;
1404        };
1405        match result {
1406            Err(error) => {
1407                self.emit_llm_error(error);
1408            }
1409            Ok(mut llm_response) => {
1410                // Reclassify a zero-output `OutputLimit` as `ContextOverflow`
1411                // when the prompt nearly filled the window, before the terminal
1412                // reason drives the finish decision below.
1413                refine_terminal_reason_for_context_window(
1414                    &mut llm_response,
1415                    self.config.max_context_tokens,
1416                );
1417                self.record_llm_usage(&llm_response, self.llm_response_text(&llm_response));
1418                if self.handle_terminal_llm_response(&llm_response, text_streamed) {
1419                    return;
1420                }
1421                let actions = {
1422                    let driver = Arc::clone(&self.config.protocol_driver);
1423                    let ctx = self.driver_context();
1424                    driver.handle_llm_success(ctx, waiting, llm_response, text_streamed)
1425                };
1426                self.apply_actions(actions);
1427            }
1428        }
1429    }
1430
1431    fn handle_terminal_llm_response(
1432        &mut self,
1433        llm_response: &LlmResponse,
1434        text_streamed: bool,
1435    ) -> bool {
1436        let outcome = match llm_response.terminal_reason {
1437            LlmTerminalReason::OutputLimit => TurnOutcome::Stopped(TurnStop::Incomplete),
1438            LlmTerminalReason::ContextOverflow => TurnOutcome::Stopped(TurnStop::ProviderError),
1439            LlmTerminalReason::ContentFilter => TurnOutcome::Stopped(TurnStop::ProviderError),
1440            LlmTerminalReason::ProviderError => TurnOutcome::Stopped(TurnStop::ProviderError),
1441            LlmTerminalReason::Cancelled => TurnOutcome::Stopped(TurnStop::Cancelled),
1442            LlmTerminalReason::Stop | LlmTerminalReason::ToolUse | LlmTerminalReason::Unknown => {
1443                return false;
1444            }
1445        };
1446
1447        if !text_streamed && !llm_response.full_text.is_empty() {
1448            self.emit(SessionEvent::TextDelta {
1449                content: llm_response.full_text.clone(),
1450            });
1451        }
1452        self.emit(SessionEvent::LlmResponse {
1453            protocol_iteration: self.protocol_iteration,
1454            content: llm_response.full_text.clone(),
1455            duration_ms: 0,
1456        });
1457        let reason = llm_response.terminal_reason;
1458        let diagnostic = llm_response
1459            .terminal_diagnostic
1460            .clone()
1461            .unwrap_or_else(|| format!("Model call ended with terminal reason {reason:?}."));
1462        self.emit(SessionEvent::Error {
1463            message: diagnostic.clone(),
1464            envelope: Some(crate::session_model::make_error_envelope(
1465                "llm_provider",
1466                Some(reason.code()),
1467                Some(reason),
1468                diagnostic,
1469                None,
1470            )),
1471        });
1472        self.finish(outcome);
1473        true
1474    }
1475
1476    fn llm_response_text<'a>(&self, llm_response: &'a LlmResponse) -> &'a str {
1477        &llm_response.full_text
1478    }
1479
1480    fn llm_response_debug_parts(&self, llm_response: &LlmResponse) -> Option<Value> {
1481        let parts = llm_response
1482            .parts
1483            .iter()
1484            .filter_map(|part| match part {
1485                LlmOutputPart::Text { text, .. } if !text.is_empty() => Some(serde_json::json!({
1486                    "type": "text",
1487                    "text": text,
1488                })),
1489                LlmOutputPart::Text { .. } => None,
1490                LlmOutputPart::Reasoning {
1491                    text,
1492                    replay,
1493                } => Some(serde_json::json!({
1494                    "type": "reasoning",
1495                    "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1496                    "summary": replay.as_ref().map(|meta| &meta.summary),
1497                    "text": text,
1498                    "has_encrypted": replay.as_ref().is_some_and(|meta| meta.encrypted_content.is_some() || meta.signature.is_some()),
1499                    "redacted": replay.as_ref().is_some_and(|meta| meta.redacted),
1500                })),
1501                LlmOutputPart::ToolCall {
1502                    call_id,
1503                    tool_name,
1504                    input_json,
1505                    replay,
1506                } => Some(serde_json::json!({
1507                    "type": "tool_call",
1508                    "call_id": call_id,
1509                    "tool_name": tool_name,
1510                    "input_json": input_json,
1511                    "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1512                    "has_opaque": replay.as_ref().is_some_and(|meta| meta.opaque.is_some()),
1513                })),
1514            })
1515            .collect::<Vec<_>>();
1516        (!parts.is_empty()).then_some(Value::Array(parts))
1517    }
1518
1519    fn record_llm_usage(&mut self, llm_response: &LlmResponse, response_text: &str) {
1520        let usage = token_usage_from_llm_usage(&llm_response.usage);
1521        self.cumulative_usage.add(&usage);
1522        self.emit(SessionEvent::TokenUsage {
1523            protocol_iteration: self.protocol_iteration,
1524            usage: usage.clone(),
1525            cumulative: self.cumulative_usage.clone(),
1526        });
1527        if self.config.emit_llm_trace {
1528            let response_parts = self.llm_response_debug_parts(llm_response);
1529            self.pending_effects.push_back(Effect::Log {
1530                event: LogEvent::LlmDebug {
1531                    session_id: self.config.session_id.clone(),
1532                    protocol_iteration: self.protocol_iteration,
1533                    usage,
1534                    provider_usage: llm_response.provider_usage.clone(),
1535                    request_body: llm_response.request_body.clone(),
1536                    response_text: response_text.to_string(),
1537                    response_parts,
1538                },
1539            });
1540        }
1541    }
1542
1543    fn record_llm_error(&mut self, error: &LlmCallError) {
1544        if self.config.emit_llm_trace {
1545            self.pending_effects.push_back(Effect::Log {
1546                event: LogEvent::LlmError {
1547                    session_id: self.config.session_id.clone(),
1548                    protocol_iteration: self.protocol_iteration,
1549                    request_body: error.request_body.clone(),
1550                    message: error.message.clone(),
1551                    retryable: error.retryable,
1552                    raw: error.raw.clone(),
1553                    code: error.code.clone(),
1554                    terminal_reason: error.terminal_reason,
1555                },
1556            });
1557        }
1558    }
1559
1560    fn emit_llm_error(&mut self, error: LlmCallError) {
1561        self.record_llm_error(&error);
1562        self.emit(SessionEvent::Error {
1563            message: format!("LLM error: {}", error.message),
1564            envelope: Some(crate::session_model::make_error_envelope(
1565                "llm_provider",
1566                error.code.as_deref(),
1567                Some(error.terminal_reason),
1568                format!("LLM error: {}", error.message),
1569                error.raw,
1570            )),
1571        });
1572        self.finish(TurnOutcome::Stopped(TurnStop::ProviderError));
1573    }
1574
1575    fn handle_tool_results(&mut self, id: EffectId, completed: Vec<CompletedToolCall>) {
1576        let (waiting_effect_id, waiting_calls) =
1577            match std::mem::replace(&mut self.state, MachineState::Finished) {
1578                MachineState::WaitingTools { effect_id, calls } => (effect_id, calls),
1579                other => {
1580                    self.state = other;
1581                    return;
1582                }
1583            };
1584
1585        if waiting_effect_id != id {
1586            self.state = MachineState::WaitingTools {
1587                effect_id: waiting_effect_id,
1588                calls: waiting_calls,
1589            };
1590            return;
1591        }
1592
1593        for outcome in &completed {
1594            self.emit(SessionEvent::ToolCall {
1595                call_id: Some(outcome.call_id.clone()),
1596                name: outcome.tool_name.clone(),
1597                args: outcome.args.clone(),
1598                output: outcome.output.clone(),
1599                duration_ms: outcome.duration_ms,
1600            });
1601        }
1602
1603        let actions = {
1604            let driver = Arc::clone(&self.config.protocol_driver);
1605            let ctx = self.driver_context();
1606            driver.handle_tool_results(ctx, completed)
1607        };
1608        self.apply_actions(actions);
1609    }
1610
1611    fn take_waiting_exec_state(&mut self, id: EffectId) -> Option<WaitingExecState<M>> {
1612        match std::mem::replace(&mut self.state, MachineState::Finished) {
1613            MachineState::WaitingExec {
1614                effect_id,
1615                code: _,
1616                driver_state,
1617            } if effect_id == id => Some(WaitingExecState { driver_state }),
1618            other => {
1619                self.state = other;
1620                None
1621            }
1622        }
1623    }
1624
1625    fn handle_exec_result(&mut self, id: EffectId, result: Result<crate::ExecResponse, String>) {
1626        let Some(waiting) = self.take_waiting_exec_state(id) else {
1627            return;
1628        };
1629        let actions = {
1630            let driver = Arc::clone(&self.config.protocol_driver);
1631            let ctx = self.driver_context();
1632            driver.handle_exec_result(ctx, waiting, result)
1633        };
1634        self.apply_actions(actions);
1635    }
1636}
1637
1638fn token_usage_from_llm_usage(usage: &crate::llm::types::LlmUsage) -> TokenUsage {
1639    TokenUsage {
1640        input_tokens: usage.input_tokens,
1641        output_tokens: usage.output_tokens,
1642        cached_input_tokens: usage.cached_input_tokens,
1643        reasoning_tokens: usage.reasoning_tokens,
1644    }
1645}
1646
1647/// Reclassify a zero-output `OutputLimit` terminal reason as `ContextOverflow`
1648/// when the prompt nearly filled the model's context window.
1649///
1650/// Pure policy: the kernel owns the terminal-reason interpretation, so the
1651/// provider's raw reason is refined here (before it drives the finish decision
1652/// in `handle_terminal_llm_response`) rather than in the host I/O layer. A
1653/// `None` window disables the refinement.
1654fn refine_terminal_reason_for_context_window(
1655    response: &mut LlmResponse,
1656    max_context_tokens: Option<usize>,
1657) {
1658    if response.terminal_reason != LlmTerminalReason::OutputLimit {
1659        return;
1660    }
1661    if response.usage.output_tokens != 0 {
1662        return;
1663    }
1664    let Some(max_context_tokens) = max_context_tokens.filter(|value| *value > 0) else {
1665        return;
1666    };
1667    let prompt_tokens = response
1668        .usage
1669        .input_tokens
1670        .saturating_add(response.usage.cached_input_tokens)
1671        .max(0) as usize;
1672    if prompt_tokens >= max_context_tokens.saturating_mul(95) / 100 {
1673        response.terminal_reason = LlmTerminalReason::ContextOverflow;
1674        response.terminal_diagnostic = Some(
1675            "Model produced no output because the prompt reached the configured context window."
1676                .to_string(),
1677        );
1678    }
1679}
1680
1681#[cfg(test)]
1682mod tests;