Skip to main content

lash_sansio/sansio/sections/
turn_protocol.rs

1impl TurnProtocol for UnitTurnProtocol {
2    type Event = ();
3    type Termination = ();
4    type DriverState = serde_json::Value;
5}
6
7/// Opaque identifier linking an effect to its response.
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
9pub struct EffectId(pub u64);
10
11#[derive(Clone, Debug, Serialize, serde::Deserialize)]
12pub struct PendingToolCall {
13    pub call_id: String,
14    pub tool_name: String,
15    pub args: Value,
16    /// Opaque provider replay state carried through for the next request.
17    pub replay: Option<ProviderReplayMeta>,
18}
19
20#[derive(Clone, Debug, Serialize, serde::Deserialize)]
21pub struct CompletedToolCall {
22    pub call_id: String,
23    pub tool_name: String,
24    pub args: Value,
25    pub output: ToolCallOutput,
26    pub model_return: ModelToolReturn,
27    pub duration_ms: u64,
28    /// See [`PendingToolCall::replay`].
29    pub replay: Option<ProviderReplayMeta>,
30}
31
32#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
33pub struct TurnCause {
34    pub id: String,
35    pub event_type: String,
36    pub origin: MessageOrigin,
37    pub text: String,
38}
39
40impl TurnCause {
41    pub fn to_event_message(&self) -> Message {
42        Message {
43            id: self.id.clone(),
44            role: MessageRole::Event,
45            parts: Arc::new(vec![Part {
46                id: format!("{}.p0", self.id),
47                kind: PartKind::Text,
48                content: self.text.clone(),
49                attachment: None,
50                tool_call_id: None,
51                tool_name: None,
52                tool_replay: None,
53                prune_state: PruneState::Intact,
54                reasoning_meta: None,
55                response_meta: None,
56            }]),
57            origin: Some(self.origin.clone()),
58        }
59    }
60}
61
62#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
63pub struct CheckpointDelivery {
64    pub messages: Vec<PluginMessage>,
65    pub transient_messages: Vec<PluginMessage>,
66    pub turn_causes: Vec<TurnCause>,
67}
68
69pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
70    if causes.is_empty() {
71        return None;
72    }
73
74    let mut rendered = String::from("=== TURN EVENTS ===");
75    for (index, cause) in causes.iter().enumerate() {
76        rendered.push_str("\n\n");
77        rendered.push_str(&format!(
78            "--- event[{index}] · {} · {} ---\n",
79            cause.event_type, cause.id
80        ));
81        rendered.push_str("Origin: ");
82        rendered.push_str(&render_message_origin(&cause.origin));
83        rendered.push_str("\n\n");
84        rendered.push_str(cause.text.trim());
85    }
86    Some(rendered)
87}
88
89fn render_message_origin(origin: &MessageOrigin) -> String {
90    match origin {
91        MessageOrigin::Plugin {
92            plugin_id,
93            transient,
94        } => {
95            if *transient {
96                format!("plugin {plugin_id} (transient)")
97            } else {
98                format!("plugin {plugin_id}")
99            }
100        }
101        MessageOrigin::Process {
102            process_id,
103            event_type,
104            sequence,
105            wake_id,
106            ..
107        } => match wake_id {
108            Some(wake_id) => {
109                format!("process {process_id} {event_type} #{sequence} ({wake_id})")
110            }
111            None => format!("process {process_id} {event_type} #{sequence}"),
112        },
113    }
114}
115
116#[derive(Clone, Debug, Serialize, serde::Deserialize)]
117pub enum LogEvent {
118    LlmDebug {
119        session_id: String,
120        protocol_iteration: usize,
121        usage: TokenUsage,
122        provider_usage: Option<Value>,
123        request_body: Option<String>,
124        response_text: String,
125        response_parts: Option<Value>,
126    },
127    LlmError {
128        session_id: String,
129        protocol_iteration: usize,
130        request_body: Option<String>,
131        message: String,
132        retryable: bool,
133        raw: Option<String>,
134        code: Option<String>,
135        terminal_reason: LlmTerminalReason,
136    },
137}
138
139/// An effect the host must fulfil.
140//
141// `Clone` is implemented by hand below rather than derived: the derive would
142// demand `M: Clone`, but only `M::Event` is ever cloned (and `TurnProtocol`
143// already guarantees `Event: Clone`), so a manual impl keeps `Effect<M>`
144// cloneable for every protocol — which the turn checkpoint relies on.
145#[derive(Debug, Serialize, serde::Deserialize)]
146#[allow(clippy::large_enum_variant)]
147pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
148    /// Sync the live execution environment before the turn proceeds.
149    ///
150    /// `update_machine_config` is only needed after the turn has
151    /// already advanced at least once and the host may need to swap in
152    /// a refreshed system prompt or tool schema for the next
153    /// protocol iteration. Initial syncs are host-only because the machine was
154    /// already constructed from a fresh execution environment.
155    SyncExecutionEnvironment {
156        id: EffectId,
157        update_machine_config: bool,
158    },
159    /// Start an LLM call.
160    LlmCall {
161        id: EffectId,
162        request: Arc<LlmRequest>,
163    },
164    /// Cancel an in-progress LLM stream.
165    CancelLlm { id: EffectId },
166    /// Execute one or more driver-scheduled tool calls.
167    ToolCalls {
168        id: EffectId,
169        calls: Vec<PendingToolCall>,
170    },
171    /// Execute a protocol-owned code block.
172    ExecCode { id: EffectId, code: String },
173    /// Run a host/plugin checkpoint before the machine continues or completes.
174    Checkpoint {
175        id: EffectId,
176        checkpoint: CheckpointKind,
177    },
178    /// Host-implemented fire-and-forget logging.
179    Log { event: LogEvent },
180    /// Fire-and-forget event (no response needed).
181    Emit(SessionEvent),
182    /// Prompt-history progress that may be durably persisted by the host.
183    ///
184    /// This is separate from [`SessionEvent`]: UI stream events can be partial,
185    /// duplicated, or display-only, while `Progress` is emitted only after the
186    /// state machine has applied semantic message or protocol-step changes.
187    Progress {
188        messages: MessageSequence,
189        event_delta: Vec<SessionEventRecord<M::Event>>,
190        protocol_iteration: usize,
191    },
192    /// Turn is done.
193    Done {
194        messages: MessageSequence,
195        event_delta: Vec<SessionEventRecord<M::Event>>,
196        protocol_iteration: usize,
197    },
198}
199
200impl<M: TurnProtocol> Clone for Effect<M> {
201    fn clone(&self) -> Self {
202        match self {
203            Self::SyncExecutionEnvironment {
204                id,
205                update_machine_config,
206            } => Self::SyncExecutionEnvironment {
207                id: *id,
208                update_machine_config: *update_machine_config,
209            },
210            Self::LlmCall { id, request } => Self::LlmCall {
211                id: *id,
212                request: Arc::clone(request),
213            },
214            Self::CancelLlm { id } => Self::CancelLlm { id: *id },
215            Self::ToolCalls { id, calls } => Self::ToolCalls {
216                id: *id,
217                calls: calls.clone(),
218            },
219            Self::ExecCode { id, code } => Self::ExecCode {
220                id: *id,
221                code: code.clone(),
222            },
223            Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
224                id: *id,
225                checkpoint: *checkpoint,
226            },
227            Self::Log { event } => Self::Log {
228                event: event.clone(),
229            },
230            Self::Emit(event) => Self::Emit(event.clone()),
231            Self::Progress {
232                messages,
233                event_delta,
234                protocol_iteration,
235            } => Self::Progress {
236                messages: messages.clone(),
237                event_delta: event_delta.clone(),
238                protocol_iteration: *protocol_iteration,
239            },
240            Self::Done {
241                messages,
242                event_delta,
243                protocol_iteration,
244            } => Self::Done {
245                messages: messages.clone(),
246                event_delta: event_delta.clone(),
247                protocol_iteration: *protocol_iteration,
248            },
249        }
250    }
251}
252
253impl<M: TurnProtocol> Effect<M> {
254    fn id(&self) -> Option<EffectId> {
255        match self {
256            Self::SyncExecutionEnvironment { id, .. }
257            | Self::LlmCall { id, .. }
258            | Self::CancelLlm { id }
259            | Self::ToolCalls { id, .. }
260            | Self::ExecCode { id, .. }
261            | Self::Checkpoint { id, .. } => Some(*id),
262            Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
263        }
264    }
265}
266
267/// Error details from a failed LLM call.
268#[derive(Clone, Debug, Serialize, serde::Deserialize)]
269pub struct LlmCallError {
270    pub message: String,
271    pub retryable: bool,
272    pub raw: Option<String>,
273    pub code: Option<String>,
274    pub terminal_reason: LlmTerminalReason,
275    pub request_body: Option<String>,
276}
277
278/// A response to a previously emitted effect.
279pub enum Response {
280    /// Live execution environment sync completed.
281    ExecutionEnvironmentSynced {
282        id: EffectId,
283        result: Result<Option<ExecutionEnvironmentSync>, String>,
284    },
285    /// Full LLM response.
286    LlmComplete {
287        id: EffectId,
288        result: Result<LlmResponse, LlmCallError>,
289        /// When true, text deltas were already emitted during streaming,
290        /// so the driver should skip emitting `TextDelta` events.
291        text_streamed: bool,
292    },
293    /// Native tool results.
294    ToolResults {
295        id: EffectId,
296        results: Vec<CompletedToolCall>,
297    },
298    /// Mode code execution result.
299    ExecResult {
300        id: EffectId,
301        result: Result<crate::ExecResponse, String>,
302    },
303    /// Checkpoint result with optional injected messages.
304    Checkpoint {
305        id: EffectId,
306        delivery: CheckpointDelivery,
307    },
308}
309
310#[derive(Clone, Debug, Serialize, serde::Deserialize)]
311pub struct ExecutionEnvironmentSync {
312    pub system_prompt: Arc<str>,
313    pub tool_specs: Arc<Vec<LlmToolSpec>>,
314}
315
316pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
317    pub request: Arc<LlmRequest>,
318    driver_state: Option<M::DriverState>,
319}
320
321impl<M: TurnProtocol> WaitingLlmState<M> {
322    pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
323        self.driver_state.take()
324    }
325}
326
327pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
328    driver_state: M::DriverState,
329}
330
331impl<M: TurnProtocol> WaitingExecState<M> {
332    pub fn into_driver_state(self) -> M::DriverState {
333        self.driver_state
334    }
335}
336
337#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
338pub enum CheckpointResumeAction {
339    PrepareIteration,
340    Finish(TurnOutcome),
341}
342
343#[allow(clippy::large_enum_variant)]
344pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
345    Emit(SessionEvent),
346    AppendEvents(Vec<SessionEventRecord<M::Event>>),
347    StartLlm {
348        request: Arc<LlmRequest>,
349        driver_state: Option<M::DriverState>,
350    },
351    StartTools {
352        calls: Vec<PendingToolCall>,
353    },
354    StartExec {
355        code: String,
356        driver_state: M::DriverState,
357    },
358    StartCheckpoint {
359        checkpoint: CheckpointKind,
360        on_empty: CheckpointResumeAction,
361    },
362    AdvanceProtocolIteration,
363    ScheduleTurnLimitFinal {
364        message: Message,
365    },
366    Finish(TurnOutcome),
367}
368
369pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
370    config: &'a TurnMachineConfig<M>,
371    messages: &'a MessageSequence,
372    events: &'a [SessionEventRecord<M::Event>],
373    turn_causes: &'a [TurnCause],
374    protocol_iteration: usize,
375    protocol_run_offset: usize,
376    termination: &'a TurnTerminationPolicyState,
377}
378
379impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
380    pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
381        self.config.projector.project(ProjectorContext {
382            config: self.config,
383            messages: self.messages,
384            events: self.events,
385            turn_causes: self.turn_causes,
386            protocol_iteration: self.protocol_iteration,
387            use_tools,
388        })
389    }
390
391    pub fn protocol_iteration(&self) -> usize {
392        self.protocol_iteration
393    }
394
395    pub fn protocol_run_offset(&self) -> usize {
396        self.protocol_run_offset
397    }
398
399    pub fn max_turns(&self) -> Option<usize> {
400        self.config.max_turns
401    }
402
403    pub fn termination(&self) -> &M::Termination {
404        &self.config.termination
405    }
406
407    pub fn autonomous(&self) -> bool {
408        self.config.autonomous
409    }
410
411    pub fn should_force_exit_after_grace_turn(&self) -> bool {
412        self.termination.should_force_exit_after_grace_turn()
413    }
414
415    pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
416        self.termination.turn_limit_final_to_schedule(
417            self.protocol_iteration,
418            self.protocol_run_offset,
419            self.config.max_turns,
420        )
421    }
422
423    pub fn messages(&self) -> &MessageSequence {
424        self.messages
425    }
426
427    pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
428        self.events
429    }
430
431    pub fn turn_causes(&self) -> &[TurnCause] {
432        self.turn_causes
433    }
434}
435
436pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
437    pub config: &'a TurnMachineConfig<M>,
438    pub messages: &'a MessageSequence,
439    pub events: &'a [SessionEventRecord<M::Event>],
440    pub turn_causes: &'a [TurnCause],
441    pub protocol_iteration: usize,
442    pub use_tools: bool,
443}
444
445pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
446    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
447}
448
449#[derive(Clone, Debug, Default)]
450pub struct ChatContextProjector;
451
452impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
453    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
454        let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
455        let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
456        let mut messages = rendered_prompt.messages;
457        if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
458            messages.push(crate::llm::types::LlmMessage::text(
459                crate::llm::types::LlmRole::User,
460                Arc::from(turn_events),
461            ));
462        }
463        if !ctx.config.system_prompt.trim().is_empty() {
464            messages.insert(
465                0,
466                crate::llm::types::LlmMessage::text(
467                    crate::llm::types::LlmRole::System,
468                    Arc::clone(&ctx.config.system_prompt),
469                ),
470            );
471        }
472
473        Arc::new(LlmRequest {
474            model: ctx.config.model.clone(),
475            messages,
476            attachments,
477            tools: if ctx.use_tools {
478                Arc::clone(&ctx.config.tool_specs)
479            } else {
480                Arc::new(Vec::new())
481            },
482            tool_choice: if ctx.use_tools {
483                LlmToolChoice::Auto
484            } else {
485                LlmToolChoice::None
486            },
487            model_variant: ctx.config.model_variant.clone(),
488            generation: ctx.config.generation.clone(),
489            session_id: ctx.config.run_session_id.clone(),
490            output_spec: None,
491            stream_events: None,
492            provider_trace: None,
493        })
494    }
495}
496
497fn render_messages_for_projector(
498    messages: &MessageSequence,
499    turn_causes: &[TurnCause],
500) -> crate::RenderedPrompt {
501    if turn_causes.is_empty() {
502        return messages.render_prompt();
503    }
504
505    let active_cause_ids = turn_causes
506        .iter()
507        .map(|cause| cause.id.as_str())
508        .collect::<HashSet<_>>();
509    let filtered = messages
510        .iter()
511        .filter(|message| {
512            !(matches!(message.role, MessageRole::Event)
513                && active_cause_ids.contains(message.id.as_str()))
514        })
515        .cloned()
516        .collect::<Vec<_>>();
517    render_prompt(filtered.as_slice())
518}
519
520pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
521    fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
522    fn handle_llm_success(
523        &self,
524        ctx: DriverContextView<'_, M>,
525        waiting: WaitingLlmState<M>,
526        llm_response: LlmResponse,
527        text_streamed: bool,
528    ) -> Vec<DriverAction<M>>;
529    fn handle_tool_results(
530        &self,
531        ctx: DriverContextView<'_, M>,
532        completed: Vec<CompletedToolCall>,
533    ) -> Vec<DriverAction<M>>;
534    fn handle_exec_result(
535        &self,
536        ctx: DriverContextView<'_, M>,
537        waiting: WaitingExecState<M>,
538        result: Result<crate::ExecResponse, String>,
539    ) -> Vec<DriverAction<M>>;
540}
541
542/// Configuration for a `TurnMachine` instance.
543pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
544    pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
545    pub projector: Arc<dyn ContextProjector<M>>,
546    pub sync_execution_environment: bool,
547    pub model: String,
548    /// Model context-window size in tokens, if known. Lets the kernel
549    /// reclassify a zero-output `OutputLimit` terminal reason as
550    /// `ContextOverflow` when the prompt nearly filled the window. `None`
551    /// disables that refinement.
552    pub max_context_tokens: Option<usize>,
553    pub max_turns: Option<usize>,
554    pub model_variant: Option<String>,
555    pub generation: crate::llm::types::GenerationOptions,
556    pub run_session_id: Option<String>,
557    pub autonomous: bool,
558    pub tool_specs: Arc<Vec<LlmToolSpec>>,
559    pub system_prompt: Arc<str>,
560    pub session_id: String,
561    pub emit_llm_trace: bool,
562    pub termination: M::Termination,
563    pub turn_limit_final_message: crate::TurnLimitFinalMessage,
564}
565
566// ─── Internal state ───