Skip to main content

lash_sansio/sansio/sections/
turn_protocol.rs

1impl TurnProtocol for UnitTurnProtocol {
2    type Event = ();
3    type Termination = ();
4    type DriverState = serde_json::Value;
5}
6
7/// Opaque identifier linking an effect to its response.
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
9pub struct EffectId(pub u64);
10
11#[derive(Clone, Debug, Serialize, serde::Deserialize)]
12pub struct PendingToolCall {
13    pub call_id: String,
14    pub tool_name: String,
15    pub args: Value,
16    /// Opaque provider replay state carried through for the next request.
17    pub replay: Option<ProviderReplayMeta>,
18}
19
20#[derive(Clone, Debug, Serialize, serde::Deserialize)]
21pub struct CompletedToolCall {
22    pub call_id: String,
23    pub tool_name: String,
24    pub args: Value,
25    pub output: ToolCallOutput,
26    pub model_return: ModelToolReturn,
27    pub duration_ms: u64,
28    /// See [`PendingToolCall::replay`].
29    pub replay: Option<ProviderReplayMeta>,
30}
31
32#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
33pub struct TurnCause {
34    pub id: String,
35    pub event_type: String,
36    pub origin: MessageOrigin,
37    pub text: String,
38}
39
40impl TurnCause {
41    pub fn to_event_message(&self) -> Message {
42        Message {
43            id: self.id.clone(),
44            role: MessageRole::Event,
45            parts: Arc::new(vec![Part {
46                id: format!("{}.p0", self.id),
47                kind: PartKind::Text,
48                content: self.text.clone(),
49                attachment: None,
50                tool_call_id: None,
51                tool_name: None,
52                tool_replay: None,
53                prune_state: PruneState::Intact,
54                reasoning_meta: None,
55                response_meta: None,
56            }]),
57            origin: Some(self.origin.clone()),
58        }
59    }
60}
61
62#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
63pub struct CheckpointDelivery {
64    pub messages: Vec<PluginMessage>,
65    pub transient_messages: Vec<PluginMessage>,
66    pub turn_causes: Vec<TurnCause>,
67}
68
69pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
70    if causes.is_empty() {
71        return None;
72    }
73
74    let mut rendered = String::from("=== TURN EVENTS ===");
75    for (index, cause) in causes.iter().enumerate() {
76        rendered.push_str("\n\n");
77        rendered.push_str(&format!(
78            "--- event[{index}] · {} · {} ---\n",
79            cause.event_type, cause.id
80        ));
81        rendered.push_str("Origin: ");
82        rendered.push_str(&render_message_origin(&cause.origin));
83        rendered.push_str("\n\n");
84        rendered.push_str(cause.text.trim());
85    }
86    Some(rendered)
87}
88
89fn render_message_origin(origin: &MessageOrigin) -> String {
90    match origin {
91        MessageOrigin::Plugin {
92            plugin_id,
93            transient,
94        } => {
95            if *transient {
96                format!("plugin {plugin_id} (transient)")
97            } else {
98                format!("plugin {plugin_id}")
99            }
100        }
101        MessageOrigin::Process {
102            process_id,
103            event_type,
104            sequence,
105            wake_id,
106            ..
107        } => match wake_id {
108            Some(wake_id) => {
109                format!("process {process_id} {event_type} #{sequence} ({wake_id})")
110            }
111            None => format!("process {process_id} {event_type} #{sequence}"),
112        },
113    }
114}
115
116#[derive(Clone, Debug, Serialize, serde::Deserialize)]
117pub enum LogEvent {
118    LlmDebug {
119        session_id: String,
120        protocol_iteration: usize,
121        usage: TokenUsage,
122        provider_usage: Option<Value>,
123        request_body: Option<String>,
124        response_text: String,
125        response_parts: Option<Value>,
126    },
127    LlmError {
128        session_id: String,
129        protocol_iteration: usize,
130        request_body: Option<String>,
131        message: String,
132        retryable: bool,
133        raw: Option<String>,
134        code: Option<String>,
135        terminal_reason: LlmTerminalReason,
136    },
137}
138
139/// An effect the host must fulfil.
140//
141// `Clone` is implemented by hand below rather than derived: the derive would
142// demand `M: Clone`, but only `M::Event` is ever cloned (and `TurnProtocol`
143// already guarantees `Event: Clone`), so a manual impl keeps `Effect<M>`
144// cloneable for every protocol — which the turn checkpoint relies on.
145#[derive(Debug, Serialize, serde::Deserialize)]
146#[allow(clippy::large_enum_variant)]
147pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
148    /// Sync the live execution environment before the turn proceeds.
149    ///
150    /// `update_machine_config` is only needed after the turn has
151    /// already advanced at least once and the host may need to swap in
152    /// a refreshed system prompt or tool schema for the next
153    /// protocol iteration. Initial syncs are host-only because the machine was
154    /// already constructed from a fresh execution environment.
155    SyncExecutionEnvironment {
156        id: EffectId,
157        update_machine_config: bool,
158    },
159    /// Start an LLM call.
160    LlmCall {
161        id: EffectId,
162        request: Arc<LlmRequest>,
163    },
164    /// Cancel an in-progress LLM stream.
165    CancelLlm { id: EffectId },
166    /// Execute one or more driver-scheduled tool calls.
167    ToolCalls {
168        id: EffectId,
169        calls: Vec<PendingToolCall>,
170    },
171    /// Execute a protocol-owned code block.
172    ExecCode {
173        id: EffectId,
174        language: String,
175        code: String,
176    },
177    /// Run a host/plugin checkpoint before the machine continues or completes.
178    Checkpoint {
179        id: EffectId,
180        checkpoint: CheckpointKind,
181    },
182    /// Host-implemented fire-and-forget logging.
183    Log { event: LogEvent },
184    /// Fire-and-forget event (no response needed).
185    Emit(SessionEvent),
186    /// Prompt-history progress that may be durably persisted by the host.
187    ///
188    /// This is separate from [`SessionEvent`]: UI stream events can be partial,
189    /// duplicated, or display-only, while `Progress` is emitted only after the
190    /// state machine has applied semantic message or protocol-step changes.
191    Progress {
192        messages: MessageSequence,
193        event_delta: Vec<SessionEventRecord<M::Event>>,
194        protocol_iteration: usize,
195    },
196    /// Turn is done.
197    Done {
198        messages: MessageSequence,
199        event_delta: Vec<SessionEventRecord<M::Event>>,
200        protocol_iteration: usize,
201    },
202}
203
204impl<M: TurnProtocol> Clone for Effect<M> {
205    fn clone(&self) -> Self {
206        match self {
207            Self::SyncExecutionEnvironment {
208                id,
209                update_machine_config,
210            } => Self::SyncExecutionEnvironment {
211                id: *id,
212                update_machine_config: *update_machine_config,
213            },
214            Self::LlmCall { id, request } => Self::LlmCall {
215                id: *id,
216                request: Arc::clone(request),
217            },
218            Self::CancelLlm { id } => Self::CancelLlm { id: *id },
219            Self::ToolCalls { id, calls } => Self::ToolCalls {
220                id: *id,
221                calls: calls.clone(),
222            },
223            Self::ExecCode { id, language, code } => Self::ExecCode {
224                id: *id,
225                language: language.clone(),
226                code: code.clone(),
227            },
228            Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
229                id: *id,
230                checkpoint: *checkpoint,
231            },
232            Self::Log { event } => Self::Log {
233                event: event.clone(),
234            },
235            Self::Emit(event) => Self::Emit(event.clone()),
236            Self::Progress {
237                messages,
238                event_delta,
239                protocol_iteration,
240            } => Self::Progress {
241                messages: messages.clone(),
242                event_delta: event_delta.clone(),
243                protocol_iteration: *protocol_iteration,
244            },
245            Self::Done {
246                messages,
247                event_delta,
248                protocol_iteration,
249            } => Self::Done {
250                messages: messages.clone(),
251                event_delta: event_delta.clone(),
252                protocol_iteration: *protocol_iteration,
253            },
254        }
255    }
256}
257
258impl<M: TurnProtocol> Effect<M> {
259    fn id(&self) -> Option<EffectId> {
260        match self {
261            Self::SyncExecutionEnvironment { id, .. }
262            | Self::LlmCall { id, .. }
263            | Self::CancelLlm { id }
264            | Self::ToolCalls { id, .. }
265            | Self::ExecCode { id, .. }
266            | Self::Checkpoint { id, .. } => Some(*id),
267            Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
268        }
269    }
270}
271
272/// Error details from a failed LLM call.
273#[derive(Clone, Debug, Serialize, serde::Deserialize)]
274pub struct LlmCallError {
275    pub message: String,
276    pub retryable: bool,
277    /// Typed transport classification of the failure. Defaults to
278    /// [`ProviderFailureKind::Unknown`] for wrappers that are not provider
279    /// failures (and when decoding effect journals written before the field
280    /// existed).
281    #[serde(default)]
282    pub kind: crate::llm::types::ProviderFailureKind,
283    pub raw: Option<String>,
284    pub code: Option<String>,
285    pub terminal_reason: LlmTerminalReason,
286    pub request_body: Option<String>,
287}
288
289/// A response to a previously emitted effect.
290pub enum Response {
291    /// Live execution environment sync completed.
292    ExecutionEnvironmentSynced {
293        id: EffectId,
294        result: Result<Option<ExecutionEnvironmentSync>, String>,
295    },
296    /// Full LLM response.
297    LlmComplete {
298        id: EffectId,
299        result: Result<LlmResponse, LlmCallError>,
300        /// When true, text deltas were already emitted during streaming,
301        /// so the driver should skip emitting `TextDelta` events.
302        text_streamed: bool,
303    },
304    /// Native tool results.
305    ToolResults {
306        id: EffectId,
307        results: Vec<CompletedToolCall>,
308    },
309    /// Mode code execution result.
310    ExecResult {
311        id: EffectId,
312        result: Result<crate::ExecResponse, String>,
313    },
314    /// Checkpoint result with optional injected messages.
315    Checkpoint {
316        id: EffectId,
317        delivery: CheckpointDelivery,
318    },
319}
320
321#[derive(Clone, Debug, Serialize, serde::Deserialize)]
322pub struct ExecutionEnvironmentSync {
323    pub system_prompt: Arc<str>,
324    pub tool_specs: Arc<Vec<LlmToolSpec>>,
325}
326
327pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
328    pub request: Arc<LlmRequest>,
329    driver_state: Option<M::DriverState>,
330}
331
332impl<M: TurnProtocol> WaitingLlmState<M> {
333    pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
334        self.driver_state.take()
335    }
336}
337
338pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
339    driver_state: M::DriverState,
340}
341
342impl<M: TurnProtocol> WaitingExecState<M> {
343    pub fn into_driver_state(self) -> M::DriverState {
344        self.driver_state
345    }
346}
347
348#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
349pub enum CheckpointResumeAction {
350    PrepareIteration,
351    Finish(TurnOutcome),
352}
353
354#[allow(clippy::large_enum_variant)]
355pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
356    Emit(SessionEvent),
357    AppendEvents(Vec<SessionEventRecord<M::Event>>),
358    StartLlm {
359        request: Arc<LlmRequest>,
360        driver_state: Option<M::DriverState>,
361    },
362    StartTools {
363        calls: Vec<PendingToolCall>,
364    },
365    StartExec {
366        language: String,
367        code: String,
368        driver_state: M::DriverState,
369    },
370    StartCheckpoint {
371        checkpoint: CheckpointKind,
372        on_empty: CheckpointResumeAction,
373    },
374    AdvanceProtocolIteration,
375    ScheduleTurnLimitFinal {
376        message: Message,
377    },
378    Finish(TurnOutcome),
379}
380
381pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
382    config: &'a TurnMachineConfig<M>,
383    messages: &'a MessageSequence,
384    events: &'a [SessionEventRecord<M::Event>],
385    turn_causes: &'a [TurnCause],
386    protocol_iteration: usize,
387    protocol_run_offset: usize,
388    termination: &'a TurnTerminationPolicyState,
389}
390
391impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
392    pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
393        self.config.projector.project(ProjectorContext {
394            config: self.config,
395            messages: self.messages,
396            events: self.events,
397            turn_causes: self.turn_causes,
398            protocol_iteration: self.protocol_iteration,
399            use_tools,
400        })
401    }
402
403    pub fn protocol_iteration(&self) -> usize {
404        self.protocol_iteration
405    }
406
407    pub fn protocol_run_offset(&self) -> usize {
408        self.protocol_run_offset
409    }
410
411    pub fn max_turns(&self) -> Option<usize> {
412        self.config.max_turns
413    }
414
415    pub fn termination(&self) -> &M::Termination {
416        &self.config.termination
417    }
418
419    pub fn autonomous(&self) -> bool {
420        self.config.autonomous
421    }
422
423    pub fn should_force_exit_after_grace_turn(&self) -> bool {
424        self.termination.should_force_exit_after_grace_turn()
425    }
426
427    pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
428        self.termination.turn_limit_final_to_schedule(
429            self.protocol_iteration,
430            self.protocol_run_offset,
431            self.config.max_turns,
432        )
433    }
434
435    pub fn messages(&self) -> &MessageSequence {
436        self.messages
437    }
438
439    pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
440        self.events
441    }
442
443    pub fn turn_causes(&self) -> &[TurnCause] {
444        self.turn_causes
445    }
446}
447
448pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
449    pub config: &'a TurnMachineConfig<M>,
450    pub messages: &'a MessageSequence,
451    pub events: &'a [SessionEventRecord<M::Event>],
452    pub turn_causes: &'a [TurnCause],
453    pub protocol_iteration: usize,
454    pub use_tools: bool,
455}
456
457pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
458    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
459}
460
461#[derive(Clone, Debug, Default)]
462pub struct ChatContextProjector;
463
464impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
465    fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
466        let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
467        let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
468        let mut messages = rendered_prompt.messages;
469        if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
470            messages.push(crate::llm::types::LlmMessage::text(
471                crate::llm::types::LlmRole::User,
472                Arc::from(turn_events),
473            ));
474        }
475        if !ctx.config.system_prompt.trim().is_empty() {
476            messages.insert(
477                0,
478                crate::llm::types::LlmMessage::text(
479                    crate::llm::types::LlmRole::System,
480                    Arc::clone(&ctx.config.system_prompt),
481                ),
482            );
483        }
484
485        Arc::new(LlmRequest {
486            model: ctx.config.model.clone(),
487            messages,
488            attachments,
489            tools: if ctx.use_tools {
490                Arc::clone(&ctx.config.tool_specs)
491            } else {
492                Arc::new(Vec::new())
493            },
494            tool_choice: if ctx.use_tools {
495                LlmToolChoice::Auto
496            } else {
497                LlmToolChoice::None
498            },
499            model_variant: ctx.config.model_variant.clone(),
500            generation: ctx.config.generation.clone(),
501            scope: crate::llm::types::LlmRequestScope::new(
502                ctx.config.session_id.clone(),
503                format!("{}:frame:sansio", ctx.config.session_id),
504                format!(
505                    "{}:sansio:llm:{}",
506                    ctx.config.session_id, ctx.protocol_iteration
507                ),
508            ),
509            output_spec: None,
510            stream_events: None,
511            provider_trace: None,
512        })
513    }
514}
515
516fn render_messages_for_projector(
517    messages: &MessageSequence,
518    turn_causes: &[TurnCause],
519) -> crate::RenderedPrompt {
520    if turn_causes.is_empty() {
521        return messages.render_prompt();
522    }
523
524    let active_cause_ids = turn_causes
525        .iter()
526        .map(|cause| cause.id.as_str())
527        .collect::<HashSet<_>>();
528    let filtered = messages
529        .iter()
530        .filter(|message| {
531            !(matches!(message.role, MessageRole::Event)
532                && active_cause_ids.contains(message.id.as_str()))
533        })
534        .cloned()
535        .collect::<Vec<_>>();
536    render_prompt(filtered.as_slice())
537}
538
539pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
540    fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
541    fn handle_llm_success(
542        &self,
543        ctx: DriverContextView<'_, M>,
544        waiting: WaitingLlmState<M>,
545        llm_response: LlmResponse,
546        text_streamed: bool,
547    ) -> Vec<DriverAction<M>>;
548    fn handle_tool_results(
549        &self,
550        ctx: DriverContextView<'_, M>,
551        completed: Vec<CompletedToolCall>,
552    ) -> Vec<DriverAction<M>>;
553    fn handle_exec_result(
554        &self,
555        ctx: DriverContextView<'_, M>,
556        waiting: WaitingExecState<M>,
557        result: Result<crate::ExecResponse, String>,
558    ) -> Vec<DriverAction<M>>;
559}
560
561/// Configuration for a `TurnMachine` instance.
562pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
563    pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
564    pub projector: Arc<dyn ContextProjector<M>>,
565    pub sync_execution_environment: bool,
566    pub model: String,
567    /// Model context-window size in tokens, if known. Lets the kernel
568    /// reclassify a zero-output `OutputLimit` terminal reason as
569    /// `ContextOverflow` when the prompt nearly filled the window. `None`
570    /// disables that refinement.
571    pub max_context_tokens: Option<usize>,
572    pub max_turns: Option<usize>,
573    pub model_variant: Option<String>,
574    pub generation: crate::llm::types::GenerationOptions,
575    pub autonomous: bool,
576    pub tool_specs: Arc<Vec<LlmToolSpec>>,
577    pub system_prompt: Arc<str>,
578    pub session_id: String,
579    pub emit_llm_trace: bool,
580    pub termination: M::Termination,
581    pub turn_limit_final_message: crate::TurnLimitFinalMessage,
582}
583
584#[cfg(test)]
585mod llm_call_error_tests {
586    use super::LlmCallError;
587    use crate::llm::types::ProviderFailureKind;
588
589    #[test]
590    fn llm_call_error_decodes_journal_entries_that_predate_kind() {
591        // `LlmCallError` is serialized inside durable effect journals
592        // (`RuntimeEffectOutcome::LlmCall`). Entries written before the typed
593        // `kind` field existed must decode with `Unknown`.
594        let legacy = r#"{
595            "message":"rate limited",
596            "retryable":true,
597            "raw":null,
598            "code":"429",
599            "terminal_reason":"provider_error",
600            "request_body":null
601        }"#;
602        let decoded: LlmCallError = serde_json::from_str(legacy).expect("legacy call error");
603        assert!(decoded.retryable);
604        assert_eq!(decoded.kind, ProviderFailureKind::Unknown);
605    }
606}
607
608// ─── Internal state ───