Skip to main content

lash_sansio/sansio/
mod.rs

1//! Sans-IO state machine for session turns.
2//!
3//! `TurnMachine` owns the generic effect engine. Protocol-specific behavior
4//! lives behind `ProtocolDriverHandle`, which returns declarative
5//! `DriverAction`s that the machine applies.
6
7use std::collections::VecDeque;
8use std::fmt::Debug;
9use std::sync::Arc;
10use std::time::Duration;
11
12use serde::Serialize;
13use serde::de::DeserializeOwned;
14use serde_json::Value;
15
16use crate::llm::types::{
17    LlmAttachment, LlmOutputPart, LlmRequest, LlmResponse, LlmTerminalReason, LlmToolChoice,
18    LlmToolSpec, ProviderReplayMeta,
19};
20use crate::session_model::message::MessageOrigin;
21use crate::session_model::{
22    Message, MessageRole, MessageSequence, Part, PartKind, PruneState, SessionEvent,
23    SessionEventRecord, TokenUsage, ToolEvent, TurnTerminationPolicyState, fresh_message_id,
24    make_error_event, reassign_part_ids,
25};
26use crate::{
27    CheckpointKind, ModelToolReturn, PluginMessage, ToolCallOutput, TurnOutcome, TurnStop,
28};
29
30// ─── Public types ───
31
32pub trait ModeProtocol: Send + Sync + 'static {
33    type Event: Clone + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
34    type Termination: Clone + Default + Debug + Send + Sync + 'static;
35    type DriverState: Clone + Default + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
36}
37
38#[derive(Clone, Debug, Serialize, serde::Deserialize)]
39pub struct UnitModeProtocol;
40
41impl ModeProtocol for UnitModeProtocol {
42    type Event = ();
43    type Termination = ();
44    type DriverState = serde_json::Value;
45}
46
47/// Opaque identifier linking an effect to its response.
48#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
49pub struct EffectId(pub u64);
50
51#[derive(Clone, Debug, Serialize, serde::Deserialize)]
52pub struct PendingToolCall {
53    pub call_id: String,
54    pub tool_name: String,
55    pub args: Value,
56    /// Opaque provider replay state carried through for the next request.
57    pub replay: Option<ProviderReplayMeta>,
58}
59
60#[derive(Clone, Debug, Serialize, serde::Deserialize)]
61pub struct CompletedToolCall {
62    pub call_id: String,
63    pub tool_name: String,
64    pub args: Value,
65    pub output: ToolCallOutput,
66    pub model_return: ModelToolReturn,
67    pub duration_ms: u64,
68    /// See [`PendingToolCall::replay`].
69    pub replay: Option<ProviderReplayMeta>,
70}
71
72#[derive(Clone, Debug, Serialize, serde::Deserialize)]
73pub enum LogEvent {
74    LlmDebug {
75        session_id: String,
76        mode_iteration: usize,
77        usage: TokenUsage,
78        provider_usage: Option<Value>,
79        request_body: Option<String>,
80        response_text: String,
81        response_parts: Option<Value>,
82    },
83    LlmError {
84        session_id: String,
85        mode_iteration: usize,
86        request_body: Option<String>,
87        message: String,
88        retryable: bool,
89        raw: Option<String>,
90        code: Option<String>,
91        terminal_reason: LlmTerminalReason,
92    },
93}
94
95/// An effect the host must fulfil.
96#[derive(Debug)]
97#[allow(clippy::large_enum_variant)]
98pub enum Effect<M: ModeProtocol = UnitModeProtocol> {
99    /// Sync the live execution surface before the turn proceeds.
100    ///
101    /// `update_machine_config` is only needed after the turn has
102    /// already advanced at least once and the host may need to swap in
103    /// a refreshed system prompt or tool schema for the next
104    /// mode iteration. Initial syncs are host-only because the machine was
105    /// already constructed from a fresh execution surface.
106    SyncExecutionSurface {
107        id: EffectId,
108        update_machine_config: bool,
109    },
110    /// Start an LLM call.
111    LlmCall {
112        id: EffectId,
113        request: Arc<LlmRequest>,
114    },
115    /// Cancel an in-progress LLM stream.
116    CancelLlm { id: EffectId },
117    /// Execute one or more standard-mode tool calls.
118    ToolCalls {
119        id: EffectId,
120        calls: Vec<PendingToolCall>,
121    },
122    /// Execute a mode-owned code block.
123    ExecCode { id: EffectId, code: String },
124    /// Run a host/plugin checkpoint before the machine continues or completes.
125    Checkpoint {
126        id: EffectId,
127        checkpoint: CheckpointKind,
128    },
129    /// Retry backoff.
130    Sleep { id: EffectId, duration: Duration },
131    /// Host-implemented fire-and-forget logging.
132    Log { event: LogEvent },
133    /// Fire-and-forget event (no response needed).
134    Emit(SessionEvent),
135    /// Prompt-history progress that may be durably persisted by the host.
136    ///
137    /// This is separate from [`SessionEvent`]: UI stream events can be partial,
138    /// duplicated, or display-only, while `Progress` is emitted only after the
139    /// state machine has applied semantic message or mode-step changes.
140    Progress {
141        messages: MessageSequence,
142        events: Arc<Vec<SessionEventRecord<M::Event>>>,
143        mode_iteration: usize,
144    },
145    /// Turn is done.
146    Done {
147        messages: MessageSequence,
148        events: Arc<Vec<SessionEventRecord<M::Event>>>,
149        mode_iteration: usize,
150    },
151}
152
153impl<M: ModeProtocol> Effect<M> {
154    fn id(&self) -> Option<EffectId> {
155        match self {
156            Self::SyncExecutionSurface { id, .. }
157            | Self::LlmCall { id, .. }
158            | Self::CancelLlm { id }
159            | Self::ToolCalls { id, .. }
160            | Self::ExecCode { id, .. }
161            | Self::Checkpoint { id, .. }
162            | Self::Sleep { id, .. } => Some(*id),
163            Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
164        }
165    }
166}
167
168/// Error details from a failed LLM call.
169#[derive(Clone, Debug, Serialize, serde::Deserialize)]
170#[allow(clippy::large_enum_variant)]
171pub enum CheckpointEffect<M: ModeProtocol = UnitModeProtocol> {
172    SyncExecutionSurface {
173        id: EffectId,
174        update_machine_config: bool,
175    },
176    LlmCall {
177        id: EffectId,
178        request: Arc<LlmRequest>,
179    },
180    CancelLlm {
181        id: EffectId,
182    },
183    ToolCalls {
184        id: EffectId,
185        calls: Vec<PendingToolCall>,
186    },
187    ExecCode {
188        id: EffectId,
189        code: String,
190    },
191    Checkpoint {
192        id: EffectId,
193        checkpoint: CheckpointKind,
194    },
195    Sleep {
196        id: EffectId,
197        duration: Duration,
198    },
199    Log {
200        event: LogEvent,
201    },
202    Emit(SessionEvent),
203    Progress {
204        messages: Vec<Message>,
205        events: Vec<SessionEventRecord<M::Event>>,
206        mode_iteration: usize,
207    },
208    Done {
209        messages: Vec<Message>,
210        events: Vec<SessionEventRecord<M::Event>>,
211        mode_iteration: usize,
212    },
213}
214
215#[derive(Clone, Debug, Serialize, serde::Deserialize)]
216pub struct LlmCallError {
217    pub message: String,
218    pub retryable: bool,
219    pub raw: Option<String>,
220    pub code: Option<String>,
221    pub terminal_reason: LlmTerminalReason,
222    pub request_body: Option<String>,
223}
224
225/// A response to a previously emitted effect.
226pub enum Response {
227    /// Live execution surface sync completed.
228    ExecutionSurfaceSynced {
229        id: EffectId,
230        result: Result<Option<ExecutionSurfaceSync>, String>,
231    },
232    /// Full LLM response.
233    LlmComplete {
234        id: EffectId,
235        result: Result<LlmResponse, LlmCallError>,
236        /// When true, text deltas were already emitted during streaming,
237        /// so the driver should skip emitting `TextDelta` events.
238        text_streamed: bool,
239    },
240    /// Native tool results.
241    ToolResults {
242        id: EffectId,
243        results: Vec<CompletedToolCall>,
244    },
245    /// Mode code execution result.
246    ExecResult {
247        id: EffectId,
248        result: Result<crate::ExecResponse, String>,
249    },
250    /// Checkpoint result with optional injected messages.
251    Checkpoint {
252        id: EffectId,
253        messages: Vec<PluginMessage>,
254        transient_messages: Vec<PluginMessage>,
255    },
256    /// Sleep completed.
257    Timeout { id: EffectId },
258}
259
260#[derive(Clone, Serialize, serde::Deserialize)]
261pub struct ExecutionSurfaceSync {
262    pub system_prompt: Arc<str>,
263    pub tool_specs: Arc<Vec<LlmToolSpec>>,
264}
265
266pub fn driver_state<T>(state: T) -> serde_json::Value
267where
268    T: Serialize,
269{
270    serde_json::to_value(state).expect("driver state must serialize")
271}
272
273pub struct WaitingLlmState<M: ModeProtocol = UnitModeProtocol> {
274    pub request: Arc<LlmRequest>,
275    driver_state: Option<M::DriverState>,
276}
277
278impl<M: ModeProtocol> WaitingLlmState<M> {
279    pub fn take_driver_state<T>(&mut self) -> Option<T>
280    where
281        T: DeserializeOwned,
282    {
283        self.driver_state
284            .take()
285            .and_then(|state| serde_json::to_value(state).ok())
286            .and_then(|state| serde_json::from_value(state).ok())
287    }
288}
289
290pub struct WaitingExecState<M: ModeProtocol = UnitModeProtocol> {
291    driver_state: M::DriverState,
292}
293
294impl<M: ModeProtocol> WaitingExecState<M> {
295    pub fn into_driver_state<T>(self) -> Option<T>
296    where
297        T: DeserializeOwned,
298    {
299        serde_json::to_value(self.driver_state)
300            .ok()
301            .and_then(|state| serde_json::from_value(state).ok())
302    }
303}
304
305#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
306pub enum CheckpointResumeAction {
307    PrepareIteration,
308    Finish(TurnOutcome),
309}
310
311#[allow(clippy::large_enum_variant)]
312pub enum DriverAction<M: ModeProtocol = UnitModeProtocol> {
313    Emit(SessionEvent),
314    AppendEvents(Vec<SessionEventRecord<M::Event>>),
315    StartLlm {
316        request: Arc<LlmRequest>,
317        driver_state: Option<M::DriverState>,
318    },
319    StartTools {
320        calls: Vec<PendingToolCall>,
321    },
322    StartExec {
323        code: String,
324        driver_state: M::DriverState,
325    },
326    StartCheckpoint {
327        checkpoint: CheckpointKind,
328        on_empty: CheckpointResumeAction,
329    },
330    AdvanceModeIteration,
331    ScheduleTurnLimitFinal,
332    Finish(TurnOutcome),
333}
334
335pub struct DriverContextView<'a, M: ModeProtocol = UnitModeProtocol> {
336    config: &'a TurnMachineConfig<M>,
337    messages: &'a MessageSequence,
338    events: &'a [SessionEventRecord<M::Event>],
339    mode_iteration: usize,
340    mode_run_offset: usize,
341    termination: &'a TurnTerminationPolicyState,
342}
343
344impl<'a, M: ModeProtocol> DriverContextView<'a, M> {
345    pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
346        self.config.projector.project(ProjectorContext {
347            config: self.config,
348            messages: self.messages,
349            events: self.events,
350            mode_iteration: self.mode_iteration,
351            use_tools,
352        })
353    }
354
355    pub fn mode_iteration(&self) -> usize {
356        self.mode_iteration
357    }
358
359    pub fn mode_run_offset(&self) -> usize {
360        self.mode_run_offset
361    }
362
363    pub fn max_turns(&self) -> Option<usize> {
364        self.config.max_turns
365    }
366
367    pub fn termination(&self) -> &M::Termination {
368        &self.config.termination
369    }
370
371    pub fn autonomous(&self) -> bool {
372        self.config.autonomous
373    }
374
375    pub fn should_force_exit_after_grace_turn(&self) -> bool {
376        self.termination.should_force_exit_after_grace_turn()
377    }
378
379    pub fn messages(&self) -> &MessageSequence {
380        self.messages
381    }
382
383    pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
384        self.events
385    }
386}
387
388pub struct ProjectorContext<'a, M: ModeProtocol = UnitModeProtocol> {
389    pub config: &'a TurnMachineConfig<M>,
390    pub messages: &'a MessageSequence,
391    pub events: &'a [SessionEventRecord<M::Event>],
392    pub mode_iteration: usize,
393    pub use_tools: bool,
394}
395
396pub trait ContextProjector<M: ModeProtocol = UnitModeProtocol>: Send + Sync {
397    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
398}
399
400#[derive(Clone, Debug, Default)]
401pub struct ChatContextProjector;
402
403impl<M: ModeProtocol> ContextProjector<M> for ChatContextProjector {
404    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
405        let rendered_prompt = ctx.messages.render_prompt();
406        let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
407        let mut messages = rendered_prompt.messages;
408        if !ctx.config.system_prompt.trim().is_empty() {
409            messages.insert(
410                0,
411                crate::llm::types::LlmMessage::text(
412                    crate::llm::types::LlmRole::System,
413                    Arc::clone(&ctx.config.system_prompt),
414                ),
415            );
416        }
417
418        Arc::new(LlmRequest {
419            model: ctx.config.model.clone(),
420            messages,
421            attachments,
422            tools: if ctx.use_tools {
423                Arc::clone(&ctx.config.tool_specs)
424            } else {
425                Arc::new(Vec::new())
426            },
427            tool_choice: if ctx.use_tools {
428                LlmToolChoice::Auto
429            } else {
430                LlmToolChoice::None
431            },
432            model_variant: ctx.config.model_variant.clone(),
433            session_id: ctx.config.run_session_id.clone(),
434            output_spec: None,
435            stream_events: None,
436            provider_trace: None,
437        })
438    }
439}
440
441pub trait ProtocolDriverHandle<M: ModeProtocol = UnitModeProtocol>: Send + Sync {
442    fn prepare_mode_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
443    fn handle_llm_success(
444        &self,
445        ctx: DriverContextView<'_, M>,
446        waiting: WaitingLlmState<M>,
447        llm_response: LlmResponse,
448        text_streamed: bool,
449    ) -> Vec<DriverAction<M>>;
450    fn handle_tool_results(
451        &self,
452        ctx: DriverContextView<'_, M>,
453        completed: Vec<CompletedToolCall>,
454    ) -> Vec<DriverAction<M>>;
455    fn handle_exec_result(
456        &self,
457        ctx: DriverContextView<'_, M>,
458        waiting: WaitingExecState<M>,
459        result: Result<crate::ExecResponse, String>,
460    ) -> Vec<DriverAction<M>>;
461}
462
463/// Configuration for a `TurnMachine` instance.
464pub struct TurnMachineConfig<M: ModeProtocol = UnitModeProtocol> {
465    pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
466    pub projector: Arc<dyn ContextProjector<M>>,
467    pub sync_execution_surface: bool,
468    pub model: String,
469    pub max_turns: Option<usize>,
470    pub model_variant: Option<String>,
471    pub run_session_id: Option<String>,
472    pub autonomous: bool,
473    pub tool_specs: Arc<Vec<LlmToolSpec>>,
474    pub system_prompt: Arc<str>,
475    pub session_id: String,
476    pub emit_llm_trace: bool,
477    pub termination: M::Termination,
478}
479
480// ─── Internal state ───
481
482#[derive(Debug, Serialize, serde::Deserialize)]
483enum MachineState<M: ModeProtocol = UnitModeProtocol> {
484    PreparingMode,
485    WaitingExecutionSurface {
486        effect_id: EffectId,
487        update_machine_config: bool,
488    },
489    PrepareIteration,
490    WaitingLlm {
491        effect_id: EffectId,
492        request: Arc<LlmRequest>,
493        driver_state: Option<M::DriverState>,
494    },
495    WaitingTools {
496        effect_id: EffectId,
497        calls: Vec<PendingToolCall>,
498    },
499    WaitingExec {
500        effect_id: EffectId,
501        code: String,
502        driver_state: M::DriverState,
503    },
504    WaitingCheckpoint {
505        effect_id: EffectId,
506        checkpoint: CheckpointKind,
507        on_empty: CheckpointResumeAction,
508    },
509    Finished,
510}
511
512#[derive(Clone, Debug, Serialize, serde::Deserialize)]
513pub struct TurnCheckpoint<M: ModeProtocol = UnitModeProtocol> {
514    state: MachineState<M>,
515    pending_effects: Vec<CheckpointEffect<M>>,
516    next_effect_id: u64,
517    messages: Vec<Message>,
518    events: Vec<SessionEventRecord<M::Event>>,
519    mode_iteration: usize,
520    mode_run_offset: usize,
521    cumulative_usage: TokenUsage,
522    termination: TurnTerminationPolicyState,
523    synced_mode_iteration: Option<usize>,
524}
525
526impl<M: ModeProtocol> Clone for MachineState<M> {
527    fn clone(&self) -> Self {
528        match self {
529            Self::PreparingMode => Self::PreparingMode,
530            Self::WaitingExecutionSurface {
531                effect_id,
532                update_machine_config,
533            } => Self::WaitingExecutionSurface {
534                effect_id: *effect_id,
535                update_machine_config: *update_machine_config,
536            },
537            Self::PrepareIteration => Self::PrepareIteration,
538            Self::WaitingLlm {
539                effect_id,
540                request,
541                driver_state,
542            } => Self::WaitingLlm {
543                effect_id: *effect_id,
544                request: Arc::clone(request),
545                driver_state: driver_state.clone(),
546            },
547            Self::WaitingTools { effect_id, calls } => Self::WaitingTools {
548                effect_id: *effect_id,
549                calls: calls.clone(),
550            },
551            Self::WaitingExec {
552                effect_id,
553                code,
554                driver_state,
555            } => Self::WaitingExec {
556                effect_id: *effect_id,
557                code: code.clone(),
558                driver_state: driver_state.clone(),
559            },
560            Self::WaitingCheckpoint {
561                effect_id,
562                checkpoint,
563                on_empty,
564            } => Self::WaitingCheckpoint {
565                effect_id: *effect_id,
566                checkpoint: *checkpoint,
567                on_empty: on_empty.clone(),
568            },
569            Self::Finished => Self::Finished,
570        }
571    }
572}
573
574impl<M: ModeProtocol> CheckpointEffect<M> {
575    fn from_effect(effect: &Effect<M>) -> Self {
576        match effect {
577            Effect::SyncExecutionSurface {
578                id,
579                update_machine_config,
580            } => Self::SyncExecutionSurface {
581                id: *id,
582                update_machine_config: *update_machine_config,
583            },
584            Effect::LlmCall { id, request } => Self::LlmCall {
585                id: *id,
586                request: Arc::clone(request),
587            },
588            Effect::CancelLlm { id } => Self::CancelLlm { id: *id },
589            Effect::ToolCalls { id, calls } => Self::ToolCalls {
590                id: *id,
591                calls: calls.clone(),
592            },
593            Effect::ExecCode { id, code } => Self::ExecCode {
594                id: *id,
595                code: code.clone(),
596            },
597            Effect::Checkpoint { id, checkpoint } => Self::Checkpoint {
598                id: *id,
599                checkpoint: *checkpoint,
600            },
601            Effect::Sleep { id, duration } => Self::Sleep {
602                id: *id,
603                duration: *duration,
604            },
605            Effect::Log { event } => Self::Log {
606                event: event.clone(),
607            },
608            Effect::Emit(event) => Self::Emit(event.clone()),
609            Effect::Progress {
610                messages,
611                events,
612                mode_iteration,
613            } => Self::Progress {
614                messages: messages.iter().cloned().collect(),
615                events: events.as_ref().clone(),
616                mode_iteration: *mode_iteration,
617            },
618            Effect::Done {
619                messages,
620                events,
621                mode_iteration,
622            } => Self::Done {
623                messages: messages.iter().cloned().collect(),
624                events: events.as_ref().clone(),
625                mode_iteration: *mode_iteration,
626            },
627        }
628    }
629
630    fn into_effect(self) -> Effect<M> {
631        match self {
632            Self::SyncExecutionSurface {
633                id,
634                update_machine_config,
635            } => Effect::SyncExecutionSurface {
636                id,
637                update_machine_config,
638            },
639            Self::LlmCall { id, request } => Effect::LlmCall { id, request },
640            Self::CancelLlm { id } => Effect::CancelLlm { id },
641            Self::ToolCalls { id, calls } => Effect::ToolCalls { id, calls },
642            Self::ExecCode { id, code } => Effect::ExecCode { id, code },
643            Self::Checkpoint { id, checkpoint } => Effect::Checkpoint { id, checkpoint },
644            Self::Sleep { id, duration } => Effect::Sleep { id, duration },
645            Self::Log { event } => Effect::Log { event },
646            Self::Emit(event) => Effect::Emit(event),
647            Self::Progress {
648                messages,
649                events,
650                mode_iteration,
651            } => Effect::Progress {
652                messages: MessageSequence::from_owned(messages),
653                events: Arc::new(events),
654                mode_iteration,
655            },
656            Self::Done {
657                messages,
658                events,
659                mode_iteration,
660            } => Effect::Done {
661                messages: MessageSequence::from_owned(messages),
662                events: Arc::new(events),
663                mode_iteration,
664            },
665        }
666    }
667}
668
669impl<M: ModeProtocol> MachineState<M> {
670    fn outstanding_effect_id(&self) -> Option<EffectId> {
671        match self {
672            Self::WaitingExecutionSurface { effect_id, .. }
673            | Self::WaitingLlm { effect_id, .. }
674            | Self::WaitingTools { effect_id, .. }
675            | Self::WaitingExec { effect_id, .. }
676            | Self::WaitingCheckpoint { effect_id, .. } => Some(*effect_id),
677            Self::PreparingMode | Self::PrepareIteration | Self::Finished => None,
678        }
679    }
680
681    fn outstanding_effect(&self) -> Option<Effect<M>> {
682        match self {
683            Self::WaitingExecutionSurface {
684                effect_id,
685                update_machine_config,
686            } => Some(Effect::SyncExecutionSurface {
687                id: *effect_id,
688                update_machine_config: *update_machine_config,
689            }),
690            Self::WaitingLlm {
691                effect_id, request, ..
692            } => Some(Effect::LlmCall {
693                id: *effect_id,
694                request: Arc::clone(request),
695            }),
696            Self::WaitingTools { effect_id, calls } => Some(Effect::ToolCalls {
697                id: *effect_id,
698                calls: calls.clone(),
699            }),
700            Self::WaitingExec {
701                effect_id, code, ..
702            } => Some(Effect::ExecCode {
703                id: *effect_id,
704                code: code.clone(),
705            }),
706            Self::WaitingCheckpoint {
707                effect_id,
708                checkpoint,
709                ..
710            } => Some(Effect::Checkpoint {
711                id: *effect_id,
712                checkpoint: *checkpoint,
713            }),
714            Self::PreparingMode | Self::PrepareIteration | Self::Finished => None,
715        }
716    }
717}
718
719/// Sans-IO state machine for a single session run (multi-turn).
720pub struct TurnMachine<M: ModeProtocol = UnitModeProtocol> {
721    config: TurnMachineConfig<M>,
722    state: MachineState<M>,
723    pending_effects: VecDeque<Effect<M>>,
724    active_effect_redelivery: bool,
725    next_effect_id: u64,
726    messages: MessageSequence,
727    events: Arc<Vec<SessionEventRecord<M::Event>>>,
728    mode_iteration: usize,
729    mode_run_offset: usize,
730    cumulative_usage: TokenUsage,
731    termination: TurnTerminationPolicyState,
732    synced_mode_iteration: Option<usize>,
733}
734
735impl<M: ModeProtocol> TurnMachine<M> {
736    /// Create a new machine in `PrepareIteration` state.
737    pub fn new(
738        config: TurnMachineConfig<M>,
739        messages: Vec<Message>,
740        events: Arc<Vec<SessionEventRecord<M::Event>>>,
741        mode_run_offset: usize,
742    ) -> Self {
743        Self::new_shared(
744            config,
745            MessageSequence::from_owned(messages),
746            events,
747            mode_run_offset,
748        )
749    }
750
751    pub fn new_shared(
752        config: TurnMachineConfig<M>,
753        messages: MessageSequence,
754        events: Arc<Vec<SessionEventRecord<M::Event>>>,
755        mode_run_offset: usize,
756    ) -> Self {
757        Self {
758            config,
759            state: MachineState::PreparingMode,
760            pending_effects: VecDeque::new(),
761            active_effect_redelivery: false,
762            next_effect_id: 1,
763            messages,
764            events,
765            mode_iteration: mode_run_offset,
766            mode_run_offset,
767            cumulative_usage: TokenUsage::default(),
768            termination: TurnTerminationPolicyState::new(),
769            synced_mode_iteration: None,
770        }
771    }
772
773    /// Whether the machine has finished.
774    pub fn is_done(&self) -> bool {
775        matches!(self.state, MachineState::Finished)
776    }
777
778    pub fn messages(&self) -> Arc<Vec<Message>> {
779        self.messages.shared()
780    }
781
782    pub fn events(&self) -> Arc<Vec<SessionEventRecord<M::Event>>> {
783        Arc::clone(&self.events)
784    }
785
786    pub fn message_sequence(&self) -> MessageSequence {
787        self.messages.clone()
788    }
789
790    pub fn mode_iteration(&self) -> usize {
791        self.mode_iteration
792    }
793
794    pub fn checkpoint(&self) -> TurnCheckpoint<M> {
795        let active_effect_id = self.state.outstanding_effect_id();
796        let pending_effects = self
797            .pending_effects
798            .iter()
799            .filter(|effect| active_effect_id.is_none_or(|id| effect.id() != Some(id)))
800            .map(CheckpointEffect::from_effect)
801            .collect::<Vec<_>>();
802        TurnCheckpoint {
803            state: self.state.clone(),
804            pending_effects,
805            next_effect_id: self.next_effect_id,
806            messages: self.messages.iter().cloned().collect(),
807            events: self.events.as_ref().clone(),
808            mode_iteration: self.mode_iteration,
809            mode_run_offset: self.mode_run_offset,
810            cumulative_usage: self.cumulative_usage.clone(),
811            termination: self.termination.clone(),
812            synced_mode_iteration: self.synced_mode_iteration,
813        }
814    }
815
816    pub fn restore_from_checkpoint(
817        config: TurnMachineConfig<M>,
818        checkpoint: TurnCheckpoint<M>,
819    ) -> Self {
820        let active_effect_id = checkpoint.state.outstanding_effect_id();
821        let pending_effects = checkpoint
822            .pending_effects
823            .into_iter()
824            .map(CheckpointEffect::into_effect)
825            .collect::<VecDeque<_>>();
826        let active_effect_redelivery = active_effect_id.is_some()
827            && !pending_effects
828                .iter()
829                .any(|effect| effect.id() == active_effect_id);
830        Self {
831            config,
832            state: checkpoint.state,
833            pending_effects,
834            active_effect_redelivery,
835            next_effect_id: checkpoint.next_effect_id,
836            messages: MessageSequence::from_owned(checkpoint.messages),
837            events: Arc::new(checkpoint.events),
838            mode_iteration: checkpoint.mode_iteration,
839            mode_run_offset: checkpoint.mode_run_offset,
840            cumulative_usage: checkpoint.cumulative_usage,
841            termination: checkpoint.termination,
842            synced_mode_iteration: checkpoint.synced_mode_iteration,
843        }
844    }
845
846    fn driver_context(&self) -> DriverContextView<'_, M> {
847        DriverContextView {
848            config: &self.config,
849            messages: &self.messages,
850            events: self.events.as_slice(),
851            mode_iteration: self.mode_iteration,
852            mode_run_offset: self.mode_run_offset,
853            termination: &self.termination,
854        }
855    }
856
857    fn next_id(&mut self) -> EffectId {
858        let id = EffectId(self.next_effect_id);
859        self.next_effect_id += 1;
860        id
861    }
862
863    fn emit(&mut self, event: SessionEvent) {
864        self.pending_effects.push_back(Effect::Emit(event));
865    }
866
867    fn emit_progress(&mut self) {
868        self.pending_effects.push_back(Effect::Progress {
869            messages: self.messages.clone(),
870            events: Arc::clone(&self.events),
871            mode_iteration: self.mode_iteration,
872        });
873    }
874
875    pub fn fail_turn(&mut self, event: SessionEvent) {
876        self.emit(event);
877        self.finish(TurnOutcome::Stopped(TurnStop::RuntimeError));
878    }
879
880    pub fn finish_with_outcome(&mut self, outcome: TurnOutcome) {
881        self.finish(outcome);
882    }
883
884    fn finish(&mut self, outcome: TurnOutcome) {
885        self.emit(SessionEvent::TurnOutcome { outcome });
886        self.emit(SessionEvent::Done);
887        let msgs = std::mem::take(&mut self.messages);
888        let events = Arc::clone(&self.events);
889        let mode_iteration = self.mode_iteration;
890        self.state = MachineState::Finished;
891        self.pending_effects.push_back(Effect::Done {
892            messages: msgs,
893            events,
894            mode_iteration,
895        });
896    }
897
898    /// Drain the next pending effect. Returns `None` when the host must call
899    /// `handle_response()` before more effects become available.
900    pub fn poll_effect(&mut self) -> Option<Effect<M>> {
901        if let Some(effect) = self.pending_effects.pop_front() {
902            return Some(effect);
903        }
904        if self.active_effect_redelivery {
905            self.active_effect_redelivery = false;
906            if let Some(effect) = self.state.outstanding_effect() {
907                return Some(effect);
908            }
909        }
910
911        match &self.state {
912            MachineState::PreparingMode => {
913                self.prepare_mode();
914                self.pending_effects.pop_front()
915            }
916            MachineState::PrepareIteration => {
917                self.prepare_mode_iteration();
918                self.pending_effects.pop_front()
919            }
920            _ => None,
921        }
922    }
923
924    // ─── State transitions ───
925
926    fn prepare_mode(&mut self) {
927        if self.config.sync_execution_surface {
928            let id = self.next_id();
929            self.state = MachineState::WaitingExecutionSurface {
930                effect_id: id,
931                update_machine_config: false,
932            };
933            self.pending_effects
934                .push_back(Effect::SyncExecutionSurface {
935                    id,
936                    update_machine_config: false,
937                });
938            return;
939        }
940
941        self.prepare_mode_iteration();
942    }
943
944    fn prepare_mode_iteration(&mut self) {
945        if self.config.sync_execution_surface
946            && self.synced_mode_iteration != Some(self.mode_iteration)
947        {
948            let id = self.next_id();
949            self.state = MachineState::WaitingExecutionSurface {
950                effect_id: id,
951                update_machine_config: true,
952            };
953            self.pending_effects
954                .push_back(Effect::SyncExecutionSurface {
955                    id,
956                    update_machine_config: true,
957                });
958            return;
959        }
960        let actions = {
961            let driver = Arc::clone(&self.config.protocol_driver);
962            let ctx = self.driver_context();
963            driver.prepare_mode_iteration(ctx)
964        };
965        self.apply_actions(actions);
966    }
967
968    fn start_llm_request(
969        &mut self,
970        request: Arc<LlmRequest>,
971        driver_state: Option<M::DriverState>,
972    ) {
973        let tool_list = self
974            .config
975            .tool_specs
976            .iter()
977            .map(|tool| tool.name.as_str())
978            .collect::<Vec<_>>()
979            .join(", ");
980        self.emit(SessionEvent::LlmRequest {
981            mode_iteration: self.mode_iteration,
982            message_count: self.messages.len(),
983            tool_list,
984        });
985
986        let id = self.next_id();
987        self.state = MachineState::WaitingLlm {
988            effect_id: id,
989            request: Arc::clone(&request),
990            driver_state,
991        };
992        self.pending_effects
993            .push_back(Effect::LlmCall { id, request });
994    }
995
996    fn start_tool_calls(&mut self, calls: Vec<PendingToolCall>) {
997        let effect_id = self.next_id();
998        self.state = MachineState::WaitingTools {
999            effect_id,
1000            calls: calls.clone(),
1001        };
1002        self.pending_effects.push_back(Effect::ToolCalls {
1003            id: effect_id,
1004            calls,
1005        });
1006    }
1007
1008    fn start_exec(&mut self, code: String, driver_state: M::DriverState) {
1009        let effect_id = self.next_id();
1010        self.state = MachineState::WaitingExec {
1011            effect_id,
1012            code: code.clone(),
1013            driver_state,
1014        };
1015        self.pending_effects.push_back(Effect::ExecCode {
1016            id: effect_id,
1017            code,
1018        });
1019    }
1020
1021    fn append_event(&mut self, event: SessionEventRecord<M::Event>) {
1022        match event {
1023            SessionEventRecord::Conversation(record) => {
1024                Arc::make_mut(&mut self.events)
1025                    .push(SessionEventRecord::Conversation(record.clone()));
1026                self.messages.push(record.to_message());
1027            }
1028            SessionEventRecord::Tool(ToolEvent::Invocation { stable_key, record }) => {
1029                Arc::make_mut(&mut self.events).push(SessionEventRecord::Tool(
1030                    ToolEvent::Invocation { stable_key, record },
1031                ));
1032            }
1033            SessionEventRecord::Mode(mode_event) => {
1034                Arc::make_mut(&mut self.events).push(SessionEventRecord::Mode(mode_event));
1035            }
1036            SessionEventRecord::StateSnapshot(snapshot) => {
1037                Arc::make_mut(&mut self.events).push(SessionEventRecord::StateSnapshot(snapshot));
1038            }
1039        }
1040    }
1041
1042    pub fn apply_actions(&mut self, actions: Vec<DriverAction<M>>) {
1043        let mut progress_dirty = false;
1044        for action in actions {
1045            match action {
1046                DriverAction::Emit(event) => self.emit(event),
1047                DriverAction::AppendEvents(events) => {
1048                    if !events.is_empty() {
1049                        for event in events {
1050                            self.append_event(event);
1051                        }
1052                        progress_dirty = true;
1053                    }
1054                }
1055                DriverAction::StartLlm {
1056                    request,
1057                    driver_state,
1058                } => self.start_llm_request(request, driver_state),
1059                DriverAction::StartTools { calls } => self.start_tool_calls(calls),
1060                DriverAction::StartExec { code, driver_state } => {
1061                    self.start_exec(code, driver_state)
1062                }
1063                DriverAction::StartCheckpoint {
1064                    checkpoint,
1065                    on_empty,
1066                } => self.request_checkpoint(checkpoint, on_empty),
1067                DriverAction::AdvanceModeIteration => {
1068                    self.mode_iteration += 1;
1069                    self.synced_mode_iteration = None;
1070                    progress_dirty = true;
1071                }
1072                DriverAction::ScheduleTurnLimitFinal => {
1073                    self.termination.maybe_schedule_turn_limit_final(
1074                        self.mode_iteration,
1075                        self.mode_run_offset,
1076                        self.config.max_turns,
1077                        self.messages.make_mut(),
1078                    );
1079                    progress_dirty = true;
1080                }
1081                DriverAction::Finish(outcome) => {
1082                    if progress_dirty {
1083                        self.emit_progress();
1084                        progress_dirty = false;
1085                    }
1086                    self.finish(outcome);
1087                    break;
1088                }
1089            }
1090        }
1091        if progress_dirty {
1092            self.emit_progress();
1093        }
1094    }
1095
1096    /// Feed a response to a previously emitted effect.
1097    pub fn handle_response(&mut self, response: Response) {
1098        self.active_effect_redelivery = false;
1099        match response {
1100            Response::ExecutionSurfaceSynced { id, result } => {
1101                self.handle_execution_surface_synced(id, result)
1102            }
1103            Response::LlmComplete {
1104                id,
1105                result,
1106                text_streamed,
1107            } => self.handle_llm_complete(id, result, text_streamed),
1108            Response::ToolResults { id, results } => self.handle_tool_results(id, results),
1109            Response::ExecResult { id, result } => self.handle_exec_result(id, result),
1110            Response::Checkpoint {
1111                id,
1112                messages,
1113                transient_messages,
1114            } => self.handle_checkpoint(id, messages, transient_messages),
1115            Response::Timeout { id } => self.handle_timeout(id),
1116        }
1117    }
1118
1119    fn request_checkpoint(&mut self, checkpoint: CheckpointKind, on_empty: CheckpointResumeAction) {
1120        let id = self.next_id();
1121        self.state = MachineState::WaitingCheckpoint {
1122            effect_id: id,
1123            checkpoint,
1124            on_empty,
1125        };
1126        self.pending_effects
1127            .push_back(Effect::Checkpoint { id, checkpoint });
1128    }
1129
1130    fn handle_execution_surface_synced(
1131        &mut self,
1132        id: EffectId,
1133        result: Result<Option<ExecutionSurfaceSync>, String>,
1134    ) {
1135        let (waiting_id, waiting_update_machine_config) =
1136            match std::mem::replace(&mut self.state, MachineState::Finished) {
1137                MachineState::WaitingExecutionSurface {
1138                    effect_id,
1139                    update_machine_config,
1140                } => (effect_id, update_machine_config),
1141                other => {
1142                    self.state = other;
1143                    return;
1144                }
1145            };
1146        if waiting_id != id {
1147            self.state = MachineState::WaitingExecutionSurface {
1148                effect_id: waiting_id,
1149                update_machine_config: waiting_update_machine_config,
1150            };
1151            return;
1152        }
1153
1154        match result {
1155            Ok(update) => {
1156                if let Some(update) = update {
1157                    self.config.system_prompt = update.system_prompt;
1158                    self.config.tool_specs = update.tool_specs;
1159                }
1160                self.synced_mode_iteration = Some(self.mode_iteration);
1161                self.state = MachineState::PrepareIteration;
1162            }
1163            Err(error) => {
1164                self.fail_turn(make_error_event(
1165                    "execution_surface",
1166                    Some("reconfigure_failed"),
1167                    format!("Failed to refresh execution surface: {error}"),
1168                    Some(error),
1169                ));
1170            }
1171        }
1172    }
1173
1174    fn append_checkpoint_messages(&mut self, plugin_messages: &[PluginMessage], transient: bool) {
1175        let appended = plugin_messages
1176            .iter()
1177            .filter(|message| matches!(message.role, MessageRole::User | MessageRole::System))
1178            .map(|message| {
1179                let message_id = fresh_message_id();
1180                let mut parts = if message.parts.is_empty() {
1181                    vec![Part {
1182                        id: format!("{message_id}.p0"),
1183                        kind: PartKind::Text,
1184                        content: message.content.clone(),
1185                        attachment: None,
1186                        tool_call_id: None,
1187                        tool_name: None,
1188                        tool_replay: None,
1189                        prune_state: PruneState::Intact,
1190                        reasoning_meta: None,
1191                        response_meta: None,
1192                    }]
1193                } else {
1194                    message.parts.clone()
1195                };
1196                reassign_part_ids(&message_id, &mut parts);
1197                Message {
1198                    id: message_id.clone(),
1199                    role: message.role,
1200                    parts: Arc::new(parts),
1201                    origin: Some(MessageOrigin::Plugin {
1202                        plugin_id: "plugin".to_string(),
1203                        transient,
1204                    }),
1205                }
1206            })
1207            .collect::<Vec<_>>();
1208        if !appended.is_empty() {
1209            self.messages.extend(appended);
1210        }
1211    }
1212
1213    fn handle_checkpoint(
1214        &mut self,
1215        id: EffectId,
1216        messages: Vec<PluginMessage>,
1217        transient_messages: Vec<PluginMessage>,
1218    ) {
1219        let (effect_id, checkpoint, on_empty) =
1220            match std::mem::replace(&mut self.state, MachineState::Finished) {
1221                MachineState::WaitingCheckpoint {
1222                    effect_id,
1223                    checkpoint,
1224                    on_empty,
1225                } => (effect_id, checkpoint, on_empty),
1226                other => {
1227                    self.state = other;
1228                    return;
1229                }
1230            };
1231        if effect_id != id {
1232            self.state = MachineState::WaitingCheckpoint {
1233                effect_id,
1234                checkpoint,
1235                on_empty,
1236            };
1237            return;
1238        }
1239
1240        if !messages.is_empty() || !transient_messages.is_empty() {
1241            self.append_checkpoint_messages(&messages, false);
1242            self.append_checkpoint_messages(&transient_messages, true);
1243            if matches!(checkpoint, CheckpointKind::BeforeCompletion) {
1244                self.mode_iteration += 1;
1245                if self.termination.should_force_exit_after_grace_turn() {
1246                    self.emit_progress();
1247                    self.finish(TurnOutcome::Stopped(TurnStop::MaxTurns));
1248                    return;
1249                }
1250                self.termination.maybe_schedule_turn_limit_final(
1251                    self.mode_iteration,
1252                    self.mode_run_offset,
1253                    self.config.max_turns,
1254                    self.messages.make_mut(),
1255                );
1256            }
1257            self.state = MachineState::PrepareIteration;
1258            self.emit_progress();
1259            return;
1260        }
1261
1262        match on_empty {
1263            CheckpointResumeAction::PrepareIteration => {
1264                self.state = MachineState::PrepareIteration;
1265            }
1266            CheckpointResumeAction::Finish(outcome) => self.finish(outcome),
1267        }
1268    }
1269
1270    fn take_waiting_llm_state(&mut self, id: EffectId) -> Option<WaitingLlmState<M>> {
1271        match std::mem::replace(&mut self.state, MachineState::Finished) {
1272            MachineState::WaitingLlm {
1273                effect_id,
1274                request,
1275                driver_state,
1276            } if effect_id == id => Some(WaitingLlmState {
1277                request,
1278                driver_state,
1279            }),
1280            other => {
1281                self.state = other;
1282                None
1283            }
1284        }
1285    }
1286
1287    fn handle_llm_complete(
1288        &mut self,
1289        id: EffectId,
1290        result: Result<LlmResponse, LlmCallError>,
1291        text_streamed: bool,
1292    ) {
1293        let Some(waiting) = self.take_waiting_llm_state(id) else {
1294            return;
1295        };
1296        match result {
1297            Err(error) => {
1298                self.emit_llm_error(error);
1299            }
1300            Ok(llm_response) => {
1301                self.record_llm_usage(&llm_response, self.llm_response_text(&llm_response));
1302                if self.handle_terminal_llm_response(&llm_response, text_streamed) {
1303                    return;
1304                }
1305                let actions = {
1306                    let driver = Arc::clone(&self.config.protocol_driver);
1307                    let ctx = self.driver_context();
1308                    driver.handle_llm_success(ctx, waiting, llm_response, text_streamed)
1309                };
1310                self.apply_actions(actions);
1311            }
1312        }
1313    }
1314
1315    fn handle_terminal_llm_response(
1316        &mut self,
1317        llm_response: &LlmResponse,
1318        text_streamed: bool,
1319    ) -> bool {
1320        let outcome = match llm_response.terminal_reason {
1321            LlmTerminalReason::OutputLimit => TurnOutcome::Stopped(TurnStop::Incomplete),
1322            LlmTerminalReason::ContextOverflow => TurnOutcome::Stopped(TurnStop::ProviderError),
1323            LlmTerminalReason::ContentFilter => TurnOutcome::Stopped(TurnStop::ProviderError),
1324            LlmTerminalReason::ProviderError => TurnOutcome::Stopped(TurnStop::ProviderError),
1325            LlmTerminalReason::Cancelled => TurnOutcome::Stopped(TurnStop::Cancelled),
1326            LlmTerminalReason::Stop | LlmTerminalReason::ToolUse | LlmTerminalReason::Unknown => {
1327                return false;
1328            }
1329        };
1330
1331        if !text_streamed && !llm_response.full_text.is_empty() {
1332            self.emit(SessionEvent::TextDelta {
1333                content: llm_response.full_text.clone(),
1334            });
1335        }
1336        self.emit(SessionEvent::LlmResponse {
1337            mode_iteration: self.mode_iteration,
1338            content: llm_response.full_text.clone(),
1339            duration_ms: 0,
1340        });
1341        let reason = llm_response.terminal_reason;
1342        let diagnostic = llm_response
1343            .terminal_diagnostic
1344            .clone()
1345            .unwrap_or_else(|| format!("Model call ended with terminal reason {reason:?}."));
1346        self.emit(SessionEvent::Error {
1347            message: diagnostic.clone(),
1348            envelope: Some(crate::session_model::make_error_envelope(
1349                "llm_provider",
1350                Some(reason.code()),
1351                Some(reason),
1352                diagnostic,
1353                None,
1354            )),
1355        });
1356        self.finish(outcome);
1357        true
1358    }
1359
1360    fn llm_response_text<'a>(&self, llm_response: &'a LlmResponse) -> &'a str {
1361        &llm_response.full_text
1362    }
1363
1364    fn llm_response_debug_parts(&self, llm_response: &LlmResponse) -> Option<Value> {
1365        let parts = llm_response
1366            .parts
1367            .iter()
1368            .filter_map(|part| match part {
1369                LlmOutputPart::Text { text, .. } if !text.is_empty() => Some(serde_json::json!({
1370                    "type": "text",
1371                    "text": text,
1372                })),
1373                LlmOutputPart::Text { .. } => None,
1374                LlmOutputPart::Reasoning {
1375                    text,
1376                    replay,
1377                } => Some(serde_json::json!({
1378                    "type": "reasoning",
1379                    "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1380                    "summary": replay.as_ref().map(|meta| &meta.summary),
1381                    "text": text,
1382                    "has_encrypted": replay.as_ref().is_some_and(|meta| meta.encrypted_content.is_some() || meta.signature.is_some()),
1383                    "redacted": replay.as_ref().is_some_and(|meta| meta.redacted),
1384                })),
1385                LlmOutputPart::ToolCall {
1386                    call_id,
1387                    tool_name,
1388                    input_json,
1389                    replay,
1390                } => Some(serde_json::json!({
1391                    "type": "tool_call",
1392                    "call_id": call_id,
1393                    "tool_name": tool_name,
1394                    "input_json": input_json,
1395                    "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1396                    "has_opaque": replay.as_ref().is_some_and(|meta| meta.opaque.is_some()),
1397                })),
1398            })
1399            .collect::<Vec<_>>();
1400        (!parts.is_empty()).then_some(Value::Array(parts))
1401    }
1402
1403    fn record_llm_usage(&mut self, llm_response: &LlmResponse, response_text: &str) {
1404        let usage = token_usage_from_llm_usage(&llm_response.usage);
1405        self.cumulative_usage.add(&usage);
1406        self.emit(SessionEvent::TokenUsage {
1407            mode_iteration: self.mode_iteration,
1408            usage: usage.clone(),
1409            cumulative: self.cumulative_usage.clone(),
1410        });
1411        if self.config.emit_llm_trace {
1412            let response_parts = self.llm_response_debug_parts(llm_response);
1413            self.pending_effects.push_back(Effect::Log {
1414                event: LogEvent::LlmDebug {
1415                    session_id: self.config.session_id.clone(),
1416                    mode_iteration: self.mode_iteration,
1417                    usage,
1418                    provider_usage: llm_response.provider_usage.clone(),
1419                    request_body: llm_response.request_body.clone(),
1420                    response_text: response_text.to_string(),
1421                    response_parts,
1422                },
1423            });
1424        }
1425    }
1426
1427    fn record_llm_error(&mut self, error: &LlmCallError) {
1428        if self.config.emit_llm_trace {
1429            self.pending_effects.push_back(Effect::Log {
1430                event: LogEvent::LlmError {
1431                    session_id: self.config.session_id.clone(),
1432                    mode_iteration: self.mode_iteration,
1433                    request_body: error.request_body.clone(),
1434                    message: error.message.clone(),
1435                    retryable: error.retryable,
1436                    raw: error.raw.clone(),
1437                    code: error.code.clone(),
1438                    terminal_reason: error.terminal_reason,
1439                },
1440            });
1441        }
1442    }
1443
1444    fn emit_llm_error(&mut self, error: LlmCallError) {
1445        self.record_llm_error(&error);
1446        self.emit(SessionEvent::Error {
1447            message: format!("LLM error: {}", error.message),
1448            envelope: Some(crate::session_model::make_error_envelope(
1449                "llm_provider",
1450                error.code.as_deref(),
1451                Some(error.terminal_reason),
1452                format!("LLM error: {}", error.message),
1453                error.raw,
1454            )),
1455        });
1456        self.finish(TurnOutcome::Stopped(TurnStop::ProviderError));
1457    }
1458
1459    fn handle_tool_results(&mut self, id: EffectId, completed: Vec<CompletedToolCall>) {
1460        let (waiting_effect_id, waiting_calls) =
1461            match std::mem::replace(&mut self.state, MachineState::Finished) {
1462                MachineState::WaitingTools { effect_id, calls } => (effect_id, calls),
1463                other => {
1464                    self.state = other;
1465                    return;
1466                }
1467            };
1468
1469        if waiting_effect_id != id {
1470            self.state = MachineState::WaitingTools {
1471                effect_id: waiting_effect_id,
1472                calls: waiting_calls,
1473            };
1474            return;
1475        }
1476
1477        for outcome in &completed {
1478            self.emit(SessionEvent::ToolCall {
1479                call_id: Some(outcome.call_id.clone()),
1480                name: outcome.tool_name.clone(),
1481                args: outcome.args.clone(),
1482                output: outcome.output.clone(),
1483                duration_ms: outcome.duration_ms,
1484            });
1485        }
1486
1487        let actions = {
1488            let driver = Arc::clone(&self.config.protocol_driver);
1489            let ctx = self.driver_context();
1490            driver.handle_tool_results(ctx, completed)
1491        };
1492        self.apply_actions(actions);
1493    }
1494
1495    fn take_waiting_exec_state(&mut self, id: EffectId) -> Option<WaitingExecState<M>> {
1496        match std::mem::replace(&mut self.state, MachineState::Finished) {
1497            MachineState::WaitingExec {
1498                effect_id,
1499                code: _,
1500                driver_state,
1501            } if effect_id == id => Some(WaitingExecState { driver_state }),
1502            other => {
1503                self.state = other;
1504                None
1505            }
1506        }
1507    }
1508
1509    fn handle_exec_result(&mut self, id: EffectId, result: Result<crate::ExecResponse, String>) {
1510        let Some(waiting) = self.take_waiting_exec_state(id) else {
1511            return;
1512        };
1513        let actions = {
1514            let driver = Arc::clone(&self.config.protocol_driver);
1515            let ctx = self.driver_context();
1516            driver.handle_exec_result(ctx, waiting, result)
1517        };
1518        self.apply_actions(actions);
1519    }
1520
1521    fn handle_timeout(&mut self, _id: EffectId) {}
1522}
1523
1524fn token_usage_from_llm_usage(usage: &crate::llm::types::LlmUsage) -> TokenUsage {
1525    TokenUsage {
1526        input_tokens: usage.input_tokens,
1527        output_tokens: usage.output_tokens,
1528        cached_input_tokens: usage.cached_input_tokens,
1529        reasoning_tokens: usage.reasoning_tokens,
1530    }
1531}
1532
1533#[cfg(test)]
1534mod tests;