1impl TurnProtocol for UnitTurnProtocol {
2 type Event = ();
3 type Termination = ();
4 type DriverState = serde_json::Value;
5}
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
9pub struct EffectId(pub u64);
10
11#[derive(Clone, Debug, Serialize, serde::Deserialize)]
12pub struct PendingToolCall {
13 pub call_id: String,
14 pub tool_name: String,
15 pub args: Value,
16 pub replay: Option<ProviderReplayMeta>,
18}
19
20#[derive(Clone, Debug, Serialize, serde::Deserialize)]
21pub struct CompletedToolCall {
22 pub call_id: String,
23 pub tool_name: String,
24 pub args: Value,
25 pub output: ToolCallOutput,
26 pub model_return: ModelToolReturn,
27 pub duration_ms: u64,
28 pub replay: Option<ProviderReplayMeta>,
30}
31
32#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
33pub struct TurnCause {
34 pub id: String,
35 pub event_type: String,
36 pub origin: MessageOrigin,
37 pub text: String,
38}
39
40impl TurnCause {
41 pub fn to_event_message(&self) -> Message {
42 Message {
43 id: self.id.clone(),
44 role: MessageRole::Event,
45 parts: Arc::new(vec![Part {
46 id: format!("{}.p0", self.id),
47 kind: PartKind::Text,
48 content: self.text.clone(),
49 attachment: None,
50 tool_call_id: None,
51 tool_name: None,
52 tool_replay: None,
53 prune_state: PruneState::Intact,
54 reasoning_meta: None,
55 response_meta: None,
56 }]),
57 origin: Some(self.origin.clone()),
58 }
59 }
60}
61
62#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
63pub struct CheckpointDelivery {
64 pub messages: Vec<PluginMessage>,
65 pub transient_messages: Vec<PluginMessage>,
66 pub turn_causes: Vec<TurnCause>,
67}
68
69pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
70 if causes.is_empty() {
71 return None;
72 }
73
74 let mut rendered = String::from("=== TURN EVENTS ===");
75 for (index, cause) in causes.iter().enumerate() {
76 rendered.push_str("\n\n");
77 rendered.push_str(&format!(
78 "--- event[{index}] · {} · {} ---\n",
79 cause.event_type, cause.id
80 ));
81 rendered.push_str("Origin: ");
82 rendered.push_str(&render_message_origin(&cause.origin));
83 rendered.push_str("\n\n");
84 rendered.push_str(cause.text.trim());
85 }
86 Some(rendered)
87}
88
89fn render_message_origin(origin: &MessageOrigin) -> String {
90 match origin {
91 MessageOrigin::Plugin {
92 plugin_id,
93 transient,
94 } => {
95 if *transient {
96 format!("plugin {plugin_id} (transient)")
97 } else {
98 format!("plugin {plugin_id}")
99 }
100 }
101 MessageOrigin::Process {
102 process_id,
103 event_type,
104 sequence,
105 wake_id,
106 ..
107 } => match wake_id {
108 Some(wake_id) => {
109 format!("process {process_id} {event_type} #{sequence} ({wake_id})")
110 }
111 None => format!("process {process_id} {event_type} #{sequence}"),
112 },
113 }
114}
115
116#[derive(Clone, Debug, Serialize, serde::Deserialize)]
117pub enum LogEvent {
118 LlmDebug {
119 session_id: String,
120 protocol_iteration: usize,
121 usage: TokenUsage,
122 provider_usage: Option<Value>,
123 request_body: Option<String>,
124 response_text: String,
125 response_parts: Option<Value>,
126 },
127 LlmError {
128 session_id: String,
129 protocol_iteration: usize,
130 request_body: Option<String>,
131 message: String,
132 retryable: bool,
133 raw: Option<String>,
134 code: Option<String>,
135 terminal_reason: LlmTerminalReason,
136 },
137}
138
139#[derive(Debug, Serialize, serde::Deserialize)]
146#[allow(clippy::large_enum_variant)]
147pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
148 SyncExecutionEnvironment {
156 id: EffectId,
157 update_machine_config: bool,
158 },
159 LlmCall {
161 id: EffectId,
162 request: Arc<LlmRequest>,
163 },
164 CancelLlm { id: EffectId },
166 ToolCalls {
168 id: EffectId,
169 calls: Vec<PendingToolCall>,
170 },
171 ExecCode {
173 id: EffectId,
174 language: String,
175 code: String,
176 },
177 Checkpoint {
179 id: EffectId,
180 checkpoint: CheckpointKind,
181 },
182 Log { event: LogEvent },
184 Emit(SessionEvent),
186 Progress {
192 messages: MessageSequence,
193 event_delta: Vec<SessionEventRecord<M::Event>>,
194 protocol_iteration: usize,
195 },
196 Done {
198 messages: MessageSequence,
199 event_delta: Vec<SessionEventRecord<M::Event>>,
200 protocol_iteration: usize,
201 },
202}
203
204impl<M: TurnProtocol> Clone for Effect<M> {
205 fn clone(&self) -> Self {
206 match self {
207 Self::SyncExecutionEnvironment {
208 id,
209 update_machine_config,
210 } => Self::SyncExecutionEnvironment {
211 id: *id,
212 update_machine_config: *update_machine_config,
213 },
214 Self::LlmCall { id, request } => Self::LlmCall {
215 id: *id,
216 request: Arc::clone(request),
217 },
218 Self::CancelLlm { id } => Self::CancelLlm { id: *id },
219 Self::ToolCalls { id, calls } => Self::ToolCalls {
220 id: *id,
221 calls: calls.clone(),
222 },
223 Self::ExecCode { id, language, code } => Self::ExecCode {
224 id: *id,
225 language: language.clone(),
226 code: code.clone(),
227 },
228 Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
229 id: *id,
230 checkpoint: *checkpoint,
231 },
232 Self::Log { event } => Self::Log {
233 event: event.clone(),
234 },
235 Self::Emit(event) => Self::Emit(event.clone()),
236 Self::Progress {
237 messages,
238 event_delta,
239 protocol_iteration,
240 } => Self::Progress {
241 messages: messages.clone(),
242 event_delta: event_delta.clone(),
243 protocol_iteration: *protocol_iteration,
244 },
245 Self::Done {
246 messages,
247 event_delta,
248 protocol_iteration,
249 } => Self::Done {
250 messages: messages.clone(),
251 event_delta: event_delta.clone(),
252 protocol_iteration: *protocol_iteration,
253 },
254 }
255 }
256}
257
258impl<M: TurnProtocol> Effect<M> {
259 fn id(&self) -> Option<EffectId> {
260 match self {
261 Self::SyncExecutionEnvironment { id, .. }
262 | Self::LlmCall { id, .. }
263 | Self::CancelLlm { id }
264 | Self::ToolCalls { id, .. }
265 | Self::ExecCode { id, .. }
266 | Self::Checkpoint { id, .. } => Some(*id),
267 Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
268 }
269 }
270}
271
272#[derive(Clone, Debug, Serialize, serde::Deserialize)]
274pub struct LlmCallError {
275 pub message: String,
276 pub retryable: bool,
277 #[serde(default)]
282 pub kind: crate::llm::types::ProviderFailureKind,
283 pub raw: Option<String>,
284 pub code: Option<String>,
285 pub terminal_reason: LlmTerminalReason,
286 pub request_body: Option<String>,
287}
288
289pub enum Response {
291 ExecutionEnvironmentSynced {
293 id: EffectId,
294 result: Result<Option<ExecutionEnvironmentSync>, String>,
295 },
296 LlmComplete {
298 id: EffectId,
299 result: Result<LlmResponse, LlmCallError>,
300 text_streamed: bool,
303 },
304 ToolResults {
306 id: EffectId,
307 results: Vec<CompletedToolCall>,
308 },
309 ExecResult {
311 id: EffectId,
312 result: Result<crate::ExecResponse, String>,
313 },
314 Checkpoint {
316 id: EffectId,
317 delivery: CheckpointDelivery,
318 },
319}
320
321#[derive(Clone, Debug, Serialize, serde::Deserialize)]
322pub struct ExecutionEnvironmentSync {
323 pub system_prompt: Arc<str>,
324 pub tool_specs: Arc<Vec<LlmToolSpec>>,
325}
326
327pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
328 pub request: Arc<LlmRequest>,
329 driver_state: Option<M::DriverState>,
330}
331
332impl<M: TurnProtocol> WaitingLlmState<M> {
333 pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
334 self.driver_state.take()
335 }
336}
337
338pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
339 driver_state: M::DriverState,
340}
341
342impl<M: TurnProtocol> WaitingExecState<M> {
343 pub fn into_driver_state(self) -> M::DriverState {
344 self.driver_state
345 }
346}
347
348#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
349pub enum CheckpointResumeAction {
350 PrepareIteration,
351 Finish(TurnOutcome),
352}
353
354#[allow(clippy::large_enum_variant)]
355pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
356 Emit(SessionEvent),
357 AppendEvents(Vec<SessionEventRecord<M::Event>>),
358 StartLlm {
359 request: Arc<LlmRequest>,
360 driver_state: Option<M::DriverState>,
361 },
362 StartTools {
363 calls: Vec<PendingToolCall>,
364 },
365 StartExec {
366 language: String,
367 code: String,
368 driver_state: M::DriverState,
369 },
370 StartCheckpoint {
371 checkpoint: CheckpointKind,
372 on_empty: CheckpointResumeAction,
373 },
374 AdvanceProtocolIteration,
375 ScheduleTurnLimitFinal {
376 message: Message,
377 },
378 Finish(TurnOutcome),
379}
380
381pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
382 config: &'a TurnMachineConfig<M>,
383 messages: &'a MessageSequence,
384 events: &'a [SessionEventRecord<M::Event>],
385 turn_causes: &'a [TurnCause],
386 protocol_iteration: usize,
387 protocol_run_offset: usize,
388 termination: &'a TurnTerminationPolicyState,
389}
390
391impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
392 pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
393 self.config.projector.project(ProjectorContext {
394 config: self.config,
395 messages: self.messages,
396 events: self.events,
397 turn_causes: self.turn_causes,
398 protocol_iteration: self.protocol_iteration,
399 use_tools,
400 })
401 }
402
403 pub fn protocol_iteration(&self) -> usize {
404 self.protocol_iteration
405 }
406
407 pub fn protocol_run_offset(&self) -> usize {
408 self.protocol_run_offset
409 }
410
411 pub fn max_turns(&self) -> Option<usize> {
412 self.config.max_turns
413 }
414
415 pub fn termination(&self) -> &M::Termination {
416 &self.config.termination
417 }
418
419 pub fn autonomous(&self) -> bool {
420 self.config.autonomous
421 }
422
423 pub fn should_force_exit_after_grace_turn(&self) -> bool {
424 self.termination.should_force_exit_after_grace_turn()
425 }
426
427 pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
428 self.termination.turn_limit_final_to_schedule(
429 self.protocol_iteration,
430 self.protocol_run_offset,
431 self.config.max_turns,
432 )
433 }
434
435 pub fn messages(&self) -> &MessageSequence {
436 self.messages
437 }
438
439 pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
440 self.events
441 }
442
443 pub fn turn_causes(&self) -> &[TurnCause] {
444 self.turn_causes
445 }
446}
447
448pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
449 pub config: &'a TurnMachineConfig<M>,
450 pub messages: &'a MessageSequence,
451 pub events: &'a [SessionEventRecord<M::Event>],
452 pub turn_causes: &'a [TurnCause],
453 pub protocol_iteration: usize,
454 pub use_tools: bool,
455}
456
457pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
458 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
459}
460
461#[derive(Clone, Debug, Default)]
462pub struct ChatContextProjector;
463
464impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
465 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
466 let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
467 let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
468 let mut messages = rendered_prompt.messages;
469 if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
470 messages.push(crate::llm::types::LlmMessage::text(
471 crate::llm::types::LlmRole::User,
472 Arc::from(turn_events),
473 ));
474 }
475 if !ctx.config.system_prompt.trim().is_empty() {
476 messages.insert(
477 0,
478 crate::llm::types::LlmMessage::text(
479 crate::llm::types::LlmRole::System,
480 Arc::clone(&ctx.config.system_prompt),
481 ),
482 );
483 }
484
485 Arc::new(LlmRequest {
486 model: ctx.config.model.clone(),
487 messages,
488 attachments,
489 tools: if ctx.use_tools {
490 Arc::clone(&ctx.config.tool_specs)
491 } else {
492 Arc::new(Vec::new())
493 },
494 tool_choice: if ctx.use_tools {
495 LlmToolChoice::Auto
496 } else {
497 LlmToolChoice::None
498 },
499 model_variant: ctx.config.model_variant.clone(),
500 generation: ctx.config.generation.clone(),
501 scope: crate::llm::types::LlmRequestScope::new(
502 ctx.config.session_id.clone(),
503 format!("{}:frame:sansio", ctx.config.session_id),
504 format!(
505 "{}:sansio:llm:{}",
506 ctx.config.session_id, ctx.protocol_iteration
507 ),
508 ),
509 output_spec: None,
510 stream_events: None,
511 provider_trace: None,
512 })
513 }
514}
515
516fn render_messages_for_projector(
517 messages: &MessageSequence,
518 turn_causes: &[TurnCause],
519) -> crate::RenderedPrompt {
520 if turn_causes.is_empty() {
521 return messages.render_prompt();
522 }
523
524 let active_cause_ids = turn_causes
525 .iter()
526 .map(|cause| cause.id.as_str())
527 .collect::<HashSet<_>>();
528 let filtered = messages
529 .iter()
530 .filter(|message| {
531 !(matches!(message.role, MessageRole::Event)
532 && active_cause_ids.contains(message.id.as_str()))
533 })
534 .cloned()
535 .collect::<Vec<_>>();
536 render_prompt(filtered.as_slice())
537}
538
539pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
540 fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
541 fn handle_llm_success(
542 &self,
543 ctx: DriverContextView<'_, M>,
544 waiting: WaitingLlmState<M>,
545 llm_response: LlmResponse,
546 text_streamed: bool,
547 ) -> Vec<DriverAction<M>>;
548 fn handle_tool_results(
549 &self,
550 ctx: DriverContextView<'_, M>,
551 completed: Vec<CompletedToolCall>,
552 ) -> Vec<DriverAction<M>>;
553 fn handle_exec_result(
554 &self,
555 ctx: DriverContextView<'_, M>,
556 waiting: WaitingExecState<M>,
557 result: Result<crate::ExecResponse, String>,
558 ) -> Vec<DriverAction<M>>;
559}
560
561pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
563 pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
564 pub projector: Arc<dyn ContextProjector<M>>,
565 pub sync_execution_environment: bool,
566 pub model: String,
567 pub max_context_tokens: Option<usize>,
572 pub max_turns: Option<usize>,
573 pub model_variant: Option<String>,
574 pub generation: crate::llm::types::GenerationOptions,
575 pub autonomous: bool,
576 pub tool_specs: Arc<Vec<LlmToolSpec>>,
577 pub system_prompt: Arc<str>,
578 pub session_id: String,
579 pub emit_llm_trace: bool,
580 pub termination: M::Termination,
581 pub turn_limit_final_message: crate::TurnLimitFinalMessage,
582}
583
584#[cfg(test)]
585mod llm_call_error_tests {
586 use super::LlmCallError;
587 use crate::llm::types::ProviderFailureKind;
588
589 #[test]
590 fn llm_call_error_decodes_journal_entries_that_predate_kind() {
591 let legacy = r#"{
595 "message":"rate limited",
596 "retryable":true,
597 "raw":null,
598 "code":"429",
599 "terminal_reason":"provider_error",
600 "request_body":null
601 }"#;
602 let decoded: LlmCallError = serde_json::from_str(legacy).expect("legacy call error");
603 assert!(decoded.retryable);
604 assert_eq!(decoded.kind, ProviderFailureKind::Unknown);
605 }
606}
607
608