Skip to main content

lash_sansio/sansio/sections/
turn_protocol.rs

1impl TurnProtocol for UnitTurnProtocol {
2    type Event = ();
3    type Termination = ();
4    type DriverState = serde_json::Value;
5}
6
7/// Opaque identifier linking an effect to its response.
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
9pub struct EffectId(pub u64);
10
11#[derive(Clone, Debug, Serialize, serde::Deserialize)]
12pub struct PendingToolCall {
13    pub call_id: String,
14    pub tool_name: String,
15    pub args: Value,
16    /// Opaque provider replay state carried through for the next request.
17    pub replay: Option<ProviderReplayMeta>,
18}
19
20#[derive(Clone, Debug, Serialize, serde::Deserialize)]
21pub struct CompletedToolCall {
22    pub call_id: String,
23    pub tool_name: String,
24    pub args: Value,
25    pub output: ToolCallOutput,
26    pub model_return: ModelToolReturn,
27    pub duration_ms: u64,
28    /// See [`PendingToolCall::replay`].
29    pub replay: Option<ProviderReplayMeta>,
30}
31
32#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
33pub struct TurnCause {
34    pub id: String,
35    pub event_type: String,
36    pub origin: MessageOrigin,
37    pub text: String,
38}
39
40impl TurnCause {
41    pub fn to_event_message(&self) -> Message {
42        Message {
43            id: self.id.clone(),
44            role: MessageRole::Event,
45            parts: Arc::new(vec![Part {
46                id: format!("{}.p0", self.id),
47                kind: PartKind::Text,
48                content: self.text.clone(),
49                attachment: None,
50                tool_call_id: None,
51                tool_name: None,
52                tool_replay: None,
53                prune_state: PruneState::Intact,
54                reasoning_meta: None,
55                response_meta: None,
56            }]),
57            origin: Some(self.origin.clone()),
58        }
59    }
60}
61
62#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
63pub struct CheckpointDelivery {
64    pub messages: Vec<PluginMessage>,
65    pub transient_messages: Vec<PluginMessage>,
66    pub turn_causes: Vec<TurnCause>,
67}
68
69pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
70    if causes.is_empty() {
71        return None;
72    }
73
74    let mut rendered = String::from("=== TURN EVENTS ===");
75    for (index, cause) in causes.iter().enumerate() {
76        rendered.push_str("\n\n");
77        rendered.push_str(&format!(
78            "--- event[{index}] · {} · {} ---\n",
79            cause.event_type, cause.id
80        ));
81        rendered.push_str("Origin: ");
82        rendered.push_str(&render_message_origin(&cause.origin));
83        rendered.push_str("\n\n");
84        rendered.push_str(cause.text.trim());
85    }
86    Some(rendered)
87}
88
89fn render_message_origin(origin: &MessageOrigin) -> String {
90    match origin {
91        MessageOrigin::Plugin {
92            plugin_id,
93            transient,
94        } => {
95            if *transient {
96                format!("plugin {plugin_id} (transient)")
97            } else {
98                format!("plugin {plugin_id}")
99            }
100        }
101        MessageOrigin::Process {
102            process_id,
103            event_type,
104            sequence,
105            wake_id,
106            ..
107        } => match wake_id {
108            Some(wake_id) => {
109                format!("process {process_id} {event_type} #{sequence} ({wake_id})")
110            }
111            None => format!("process {process_id} {event_type} #{sequence}"),
112        },
113    }
114}
115
116#[derive(Clone, Debug, Serialize, serde::Deserialize)]
117pub enum LogEvent {
118    LlmDebug {
119        session_id: String,
120        protocol_iteration: usize,
121        usage: TokenUsage,
122        provider_usage: Option<Value>,
123        request_body: Option<String>,
124        response_text: String,
125        response_parts: Option<Value>,
126    },
127    LlmError {
128        session_id: String,
129        protocol_iteration: usize,
130        request_body: Option<String>,
131        message: String,
132        retryable: bool,
133        raw: Option<String>,
134        code: Option<String>,
135        terminal_reason: LlmTerminalReason,
136    },
137}
138
139/// An effect the host must fulfil.
140//
141// `Clone` is implemented by hand below rather than derived: the derive would
142// demand `M: Clone`, but only `M::Event` is ever cloned (and `TurnProtocol`
143// already guarantees `Event: Clone`), so a manual impl keeps `Effect<M>`
144// cloneable for every protocol — which the turn checkpoint relies on.
145#[derive(Debug, Serialize, serde::Deserialize)]
146#[allow(clippy::large_enum_variant)]
147pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
148    /// Sync the live execution environment before the turn proceeds.
149    ///
150    /// `update_machine_config` is only needed after the turn has
151    /// already advanced at least once and the host may need to swap in
152    /// a refreshed system prompt or tool schema for the next
153    /// protocol iteration. Initial syncs are host-only because the machine was
154    /// already constructed from a fresh execution environment.
155    SyncExecutionEnvironment {
156        id: EffectId,
157        update_machine_config: bool,
158    },
159    /// Start an LLM call.
160    LlmCall {
161        id: EffectId,
162        request: Arc<LlmRequest>,
163    },
164    /// Cancel an in-progress LLM stream.
165    CancelLlm { id: EffectId },
166    /// Execute one or more driver-scheduled tool calls.
167    ToolCalls {
168        id: EffectId,
169        calls: Vec<PendingToolCall>,
170    },
171    /// Execute a protocol-owned code block.
172    ExecCode {
173        id: EffectId,
174        language: String,
175        code: String,
176    },
177    /// Run a host/plugin checkpoint before the machine continues or completes.
178    Checkpoint {
179        id: EffectId,
180        checkpoint: CheckpointKind,
181    },
182    /// Host-implemented fire-and-forget logging.
183    Log { event: LogEvent },
184    /// Fire-and-forget event (no response needed).
185    Emit(SessionEvent),
186    /// Prompt-history progress that may be durably persisted by the host.
187    ///
188    /// This is separate from [`SessionEvent`]: UI stream events can be partial,
189    /// duplicated, or display-only, while `Progress` is emitted only after the
190    /// state machine has applied semantic message or protocol-step changes.
191    Progress {
192        messages: MessageSequence,
193        event_delta: Vec<SessionEventRecord<M::Event>>,
194        protocol_iteration: usize,
195    },
196    /// Turn is done.
197    Done {
198        messages: MessageSequence,
199        event_delta: Vec<SessionEventRecord<M::Event>>,
200        protocol_iteration: usize,
201    },
202}
203
204impl<M: TurnProtocol> Clone for Effect<M> {
205    fn clone(&self) -> Self {
206        match self {
207            Self::SyncExecutionEnvironment {
208                id,
209                update_machine_config,
210            } => Self::SyncExecutionEnvironment {
211                id: *id,
212                update_machine_config: *update_machine_config,
213            },
214            Self::LlmCall { id, request } => Self::LlmCall {
215                id: *id,
216                request: Arc::clone(request),
217            },
218            Self::CancelLlm { id } => Self::CancelLlm { id: *id },
219            Self::ToolCalls { id, calls } => Self::ToolCalls {
220                id: *id,
221                calls: calls.clone(),
222            },
223            Self::ExecCode { id, language, code } => Self::ExecCode {
224                id: *id,
225                language: language.clone(),
226                code: code.clone(),
227            },
228            Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
229                id: *id,
230                checkpoint: *checkpoint,
231            },
232            Self::Log { event } => Self::Log {
233                event: event.clone(),
234            },
235            Self::Emit(event) => Self::Emit(event.clone()),
236            Self::Progress {
237                messages,
238                event_delta,
239                protocol_iteration,
240            } => Self::Progress {
241                messages: messages.clone(),
242                event_delta: event_delta.clone(),
243                protocol_iteration: *protocol_iteration,
244            },
245            Self::Done {
246                messages,
247                event_delta,
248                protocol_iteration,
249            } => Self::Done {
250                messages: messages.clone(),
251                event_delta: event_delta.clone(),
252                protocol_iteration: *protocol_iteration,
253            },
254        }
255    }
256}
257
258impl<M: TurnProtocol> Effect<M> {
259    fn id(&self) -> Option<EffectId> {
260        match self {
261            Self::SyncExecutionEnvironment { id, .. }
262            | Self::LlmCall { id, .. }
263            | Self::CancelLlm { id }
264            | Self::ToolCalls { id, .. }
265            | Self::ExecCode { id, .. }
266            | Self::Checkpoint { id, .. } => Some(*id),
267            Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
268        }
269    }
270}
271
272/// Error details from a failed LLM call.
273#[derive(Clone, Debug, Serialize, serde::Deserialize)]
274pub struct LlmCallError {
275    pub message: String,
276    pub retryable: bool,
277    pub raw: Option<String>,
278    pub code: Option<String>,
279    pub terminal_reason: LlmTerminalReason,
280    pub request_body: Option<String>,
281}
282
283/// A response to a previously emitted effect.
284pub enum Response {
285    /// Live execution environment sync completed.
286    ExecutionEnvironmentSynced {
287        id: EffectId,
288        result: Result<Option<ExecutionEnvironmentSync>, String>,
289    },
290    /// Full LLM response.
291    LlmComplete {
292        id: EffectId,
293        result: Result<LlmResponse, LlmCallError>,
294        /// When true, text deltas were already emitted during streaming,
295        /// so the driver should skip emitting `TextDelta` events.
296        text_streamed: bool,
297    },
298    /// Native tool results.
299    ToolResults {
300        id: EffectId,
301        results: Vec<CompletedToolCall>,
302    },
303    /// Mode code execution result.
304    ExecResult {
305        id: EffectId,
306        result: Result<crate::ExecResponse, String>,
307    },
308    /// Checkpoint result with optional injected messages.
309    Checkpoint {
310        id: EffectId,
311        delivery: CheckpointDelivery,
312    },
313}
314
315#[derive(Clone, Debug, Serialize, serde::Deserialize)]
316pub struct ExecutionEnvironmentSync {
317    pub system_prompt: Arc<str>,
318    pub tool_specs: Arc<Vec<LlmToolSpec>>,
319}
320
321pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
322    pub request: Arc<LlmRequest>,
323    driver_state: Option<M::DriverState>,
324}
325
326impl<M: TurnProtocol> WaitingLlmState<M> {
327    pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
328        self.driver_state.take()
329    }
330}
331
332pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
333    driver_state: M::DriverState,
334}
335
336impl<M: TurnProtocol> WaitingExecState<M> {
337    pub fn into_driver_state(self) -> M::DriverState {
338        self.driver_state
339    }
340}
341
342#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
343pub enum CheckpointResumeAction {
344    PrepareIteration,
345    Finish(TurnOutcome),
346}
347
348#[allow(clippy::large_enum_variant)]
349pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
350    Emit(SessionEvent),
351    AppendEvents(Vec<SessionEventRecord<M::Event>>),
352    StartLlm {
353        request: Arc<LlmRequest>,
354        driver_state: Option<M::DriverState>,
355    },
356    StartTools {
357        calls: Vec<PendingToolCall>,
358    },
359    StartExec {
360        language: String,
361        code: String,
362        driver_state: M::DriverState,
363    },
364    StartCheckpoint {
365        checkpoint: CheckpointKind,
366        on_empty: CheckpointResumeAction,
367    },
368    AdvanceProtocolIteration,
369    ScheduleTurnLimitFinal {
370        message: Message,
371    },
372    Finish(TurnOutcome),
373}
374
375pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
376    config: &'a TurnMachineConfig<M>,
377    messages: &'a MessageSequence,
378    events: &'a [SessionEventRecord<M::Event>],
379    turn_causes: &'a [TurnCause],
380    protocol_iteration: usize,
381    protocol_run_offset: usize,
382    termination: &'a TurnTerminationPolicyState,
383}
384
385impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
386    pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
387        self.config.projector.project(ProjectorContext {
388            config: self.config,
389            messages: self.messages,
390            events: self.events,
391            turn_causes: self.turn_causes,
392            protocol_iteration: self.protocol_iteration,
393            use_tools,
394        })
395    }
396
397    pub fn protocol_iteration(&self) -> usize {
398        self.protocol_iteration
399    }
400
401    pub fn protocol_run_offset(&self) -> usize {
402        self.protocol_run_offset
403    }
404
405    pub fn max_turns(&self) -> Option<usize> {
406        self.config.max_turns
407    }
408
409    pub fn termination(&self) -> &M::Termination {
410        &self.config.termination
411    }
412
413    pub fn autonomous(&self) -> bool {
414        self.config.autonomous
415    }
416
417    pub fn should_force_exit_after_grace_turn(&self) -> bool {
418        self.termination.should_force_exit_after_grace_turn()
419    }
420
421    pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
422        self.termination.turn_limit_final_to_schedule(
423            self.protocol_iteration,
424            self.protocol_run_offset,
425            self.config.max_turns,
426        )
427    }
428
429    pub fn messages(&self) -> &MessageSequence {
430        self.messages
431    }
432
433    pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
434        self.events
435    }
436
437    pub fn turn_causes(&self) -> &[TurnCause] {
438        self.turn_causes
439    }
440}
441
442pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
443    pub config: &'a TurnMachineConfig<M>,
444    pub messages: &'a MessageSequence,
445    pub events: &'a [SessionEventRecord<M::Event>],
446    pub turn_causes: &'a [TurnCause],
447    pub protocol_iteration: usize,
448    pub use_tools: bool,
449}
450
451pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
452    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
453}
454
455#[derive(Clone, Debug, Default)]
456pub struct ChatContextProjector;
457
458impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
459    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
460        let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
461        let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
462        let mut messages = rendered_prompt.messages;
463        if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
464            messages.push(crate::llm::types::LlmMessage::text(
465                crate::llm::types::LlmRole::User,
466                Arc::from(turn_events),
467            ));
468        }
469        if !ctx.config.system_prompt.trim().is_empty() {
470            messages.insert(
471                0,
472                crate::llm::types::LlmMessage::text(
473                    crate::llm::types::LlmRole::System,
474                    Arc::clone(&ctx.config.system_prompt),
475                ),
476            );
477        }
478
479        Arc::new(LlmRequest {
480            model: ctx.config.model.clone(),
481            messages,
482            attachments,
483            tools: if ctx.use_tools {
484                Arc::clone(&ctx.config.tool_specs)
485            } else {
486                Arc::new(Vec::new())
487            },
488            tool_choice: if ctx.use_tools {
489                LlmToolChoice::Auto
490            } else {
491                LlmToolChoice::None
492            },
493            model_variant: ctx.config.model_variant.clone(),
494            generation: ctx.config.generation.clone(),
495            session_id: ctx.config.run_session_id.clone(),
496            output_spec: None,
497            stream_events: None,
498            provider_trace: None,
499        })
500    }
501}
502
503fn render_messages_for_projector(
504    messages: &MessageSequence,
505    turn_causes: &[TurnCause],
506) -> crate::RenderedPrompt {
507    if turn_causes.is_empty() {
508        return messages.render_prompt();
509    }
510
511    let active_cause_ids = turn_causes
512        .iter()
513        .map(|cause| cause.id.as_str())
514        .collect::<HashSet<_>>();
515    let filtered = messages
516        .iter()
517        .filter(|message| {
518            !(matches!(message.role, MessageRole::Event)
519                && active_cause_ids.contains(message.id.as_str()))
520        })
521        .cloned()
522        .collect::<Vec<_>>();
523    render_prompt(filtered.as_slice())
524}
525
526pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
527    fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
528    fn handle_llm_success(
529        &self,
530        ctx: DriverContextView<'_, M>,
531        waiting: WaitingLlmState<M>,
532        llm_response: LlmResponse,
533        text_streamed: bool,
534    ) -> Vec<DriverAction<M>>;
535    fn handle_tool_results(
536        &self,
537        ctx: DriverContextView<'_, M>,
538        completed: Vec<CompletedToolCall>,
539    ) -> Vec<DriverAction<M>>;
540    fn handle_exec_result(
541        &self,
542        ctx: DriverContextView<'_, M>,
543        waiting: WaitingExecState<M>,
544        result: Result<crate::ExecResponse, String>,
545    ) -> Vec<DriverAction<M>>;
546}
547
548/// Configuration for a `TurnMachine` instance.
549pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
550    pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
551    pub projector: Arc<dyn ContextProjector<M>>,
552    pub sync_execution_environment: bool,
553    pub model: String,
554    /// Model context-window size in tokens, if known. Lets the kernel
555    /// reclassify a zero-output `OutputLimit` terminal reason as
556    /// `ContextOverflow` when the prompt nearly filled the window. `None`
557    /// disables that refinement.
558    pub max_context_tokens: Option<usize>,
559    pub max_turns: Option<usize>,
560    pub model_variant: Option<String>,
561    pub generation: crate::llm::types::GenerationOptions,
562    pub run_session_id: Option<String>,
563    pub autonomous: bool,
564    pub tool_specs: Arc<Vec<LlmToolSpec>>,
565    pub system_prompt: Arc<str>,
566    pub session_id: String,
567    pub emit_llm_trace: bool,
568    pub termination: M::Termination,
569    pub turn_limit_final_message: crate::TurnLimitFinalMessage,
570}
571
572// ─── Internal state ───