1impl TurnProtocol for UnitTurnProtocol {
2 type Event = ();
3 type Termination = ();
4 type DriverState = serde_json::Value;
5}
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
9pub struct EffectId(pub u64);
10
11#[derive(Clone, Debug, Serialize, serde::Deserialize)]
12pub struct PendingToolCall {
13 pub call_id: String,
14 pub tool_name: String,
15 pub args: Value,
16 pub replay: Option<ProviderReplayMeta>,
18}
19
20#[derive(Clone, Debug, Serialize, serde::Deserialize)]
21pub struct CompletedToolCall {
22 pub call_id: String,
23 pub tool_name: String,
24 pub args: Value,
25 pub output: ToolCallOutput,
26 pub model_return: ModelToolReturn,
27 pub duration_ms: u64,
28 pub replay: Option<ProviderReplayMeta>,
30}
31
32#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
33pub struct TurnCause {
34 pub id: String,
35 pub event_type: String,
36 pub origin: MessageOrigin,
37 pub text: String,
38}
39
40impl TurnCause {
41 pub fn to_event_message(&self) -> Message {
42 Message {
43 id: self.id.clone(),
44 role: MessageRole::Event,
45 parts: Arc::new(vec![Part {
46 id: format!("{}.p0", self.id),
47 kind: PartKind::Text,
48 content: self.text.clone(),
49 attachment: None,
50 tool_call_id: None,
51 tool_name: None,
52 tool_replay: None,
53 prune_state: PruneState::Intact,
54 reasoning_meta: None,
55 response_meta: None,
56 }]),
57 origin: Some(self.origin.clone()),
58 }
59 }
60}
61
62#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
63pub struct CheckpointDelivery {
64 pub messages: Vec<PluginMessage>,
65 pub transient_messages: Vec<PluginMessage>,
66 pub turn_causes: Vec<TurnCause>,
67}
68
69pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
70 if causes.is_empty() {
71 return None;
72 }
73
74 let mut rendered = String::from("=== TURN EVENTS ===");
75 for (index, cause) in causes.iter().enumerate() {
76 rendered.push_str("\n\n");
77 rendered.push_str(&format!(
78 "--- event[{index}] · {} · {} ---\n",
79 cause.event_type, cause.id
80 ));
81 rendered.push_str("Origin: ");
82 rendered.push_str(&render_message_origin(&cause.origin));
83 rendered.push_str("\n\n");
84 rendered.push_str(cause.text.trim());
85 }
86 Some(rendered)
87}
88
89fn render_message_origin(origin: &MessageOrigin) -> String {
90 match origin {
91 MessageOrigin::Plugin {
92 plugin_id,
93 transient,
94 } => {
95 if *transient {
96 format!("plugin {plugin_id} (transient)")
97 } else {
98 format!("plugin {plugin_id}")
99 }
100 }
101 MessageOrigin::Process {
102 process_id,
103 event_type,
104 sequence,
105 wake_id,
106 ..
107 } => match wake_id {
108 Some(wake_id) => {
109 format!("process {process_id} {event_type} #{sequence} ({wake_id})")
110 }
111 None => format!("process {process_id} {event_type} #{sequence}"),
112 },
113 }
114}
115
116#[derive(Clone, Debug, Serialize, serde::Deserialize)]
117pub enum LogEvent {
118 LlmDebug {
119 session_id: String,
120 protocol_iteration: usize,
121 usage: TokenUsage,
122 provider_usage: Option<Value>,
123 request_body: Option<String>,
124 response_text: String,
125 response_parts: Option<Value>,
126 },
127 LlmError {
128 session_id: String,
129 protocol_iteration: usize,
130 request_body: Option<String>,
131 message: String,
132 retryable: bool,
133 raw: Option<String>,
134 code: Option<String>,
135 terminal_reason: LlmTerminalReason,
136 },
137}
138
139#[derive(Debug, Serialize, serde::Deserialize)]
146#[allow(clippy::large_enum_variant)]
147pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
148 SyncExecutionEnvironment {
156 id: EffectId,
157 update_machine_config: bool,
158 },
159 LlmCall {
161 id: EffectId,
162 request: Arc<LlmRequest>,
163 },
164 CancelLlm { id: EffectId },
166 ToolCalls {
168 id: EffectId,
169 calls: Vec<PendingToolCall>,
170 },
171 ExecCode { id: EffectId, code: String },
173 Checkpoint {
175 id: EffectId,
176 checkpoint: CheckpointKind,
177 },
178 Log { event: LogEvent },
180 Emit(SessionEvent),
182 Progress {
188 messages: MessageSequence,
189 event_delta: Vec<SessionEventRecord<M::Event>>,
190 protocol_iteration: usize,
191 },
192 Done {
194 messages: MessageSequence,
195 event_delta: Vec<SessionEventRecord<M::Event>>,
196 protocol_iteration: usize,
197 },
198}
199
200impl<M: TurnProtocol> Clone for Effect<M> {
201 fn clone(&self) -> Self {
202 match self {
203 Self::SyncExecutionEnvironment {
204 id,
205 update_machine_config,
206 } => Self::SyncExecutionEnvironment {
207 id: *id,
208 update_machine_config: *update_machine_config,
209 },
210 Self::LlmCall { id, request } => Self::LlmCall {
211 id: *id,
212 request: Arc::clone(request),
213 },
214 Self::CancelLlm { id } => Self::CancelLlm { id: *id },
215 Self::ToolCalls { id, calls } => Self::ToolCalls {
216 id: *id,
217 calls: calls.clone(),
218 },
219 Self::ExecCode { id, code } => Self::ExecCode {
220 id: *id,
221 code: code.clone(),
222 },
223 Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
224 id: *id,
225 checkpoint: *checkpoint,
226 },
227 Self::Log { event } => Self::Log {
228 event: event.clone(),
229 },
230 Self::Emit(event) => Self::Emit(event.clone()),
231 Self::Progress {
232 messages,
233 event_delta,
234 protocol_iteration,
235 } => Self::Progress {
236 messages: messages.clone(),
237 event_delta: event_delta.clone(),
238 protocol_iteration: *protocol_iteration,
239 },
240 Self::Done {
241 messages,
242 event_delta,
243 protocol_iteration,
244 } => Self::Done {
245 messages: messages.clone(),
246 event_delta: event_delta.clone(),
247 protocol_iteration: *protocol_iteration,
248 },
249 }
250 }
251}
252
253impl<M: TurnProtocol> Effect<M> {
254 fn id(&self) -> Option<EffectId> {
255 match self {
256 Self::SyncExecutionEnvironment { id, .. }
257 | Self::LlmCall { id, .. }
258 | Self::CancelLlm { id }
259 | Self::ToolCalls { id, .. }
260 | Self::ExecCode { id, .. }
261 | Self::Checkpoint { id, .. } => Some(*id),
262 Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
263 }
264 }
265}
266
267#[derive(Clone, Debug, Serialize, serde::Deserialize)]
269pub struct LlmCallError {
270 pub message: String,
271 pub retryable: bool,
272 pub raw: Option<String>,
273 pub code: Option<String>,
274 pub terminal_reason: LlmTerminalReason,
275 pub request_body: Option<String>,
276}
277
278pub enum Response {
280 ExecutionEnvironmentSynced {
282 id: EffectId,
283 result: Result<Option<ExecutionEnvironmentSync>, String>,
284 },
285 LlmComplete {
287 id: EffectId,
288 result: Result<LlmResponse, LlmCallError>,
289 text_streamed: bool,
292 },
293 ToolResults {
295 id: EffectId,
296 results: Vec<CompletedToolCall>,
297 },
298 ExecResult {
300 id: EffectId,
301 result: Result<crate::ExecResponse, String>,
302 },
303 Checkpoint {
305 id: EffectId,
306 delivery: CheckpointDelivery,
307 },
308}
309
310#[derive(Clone, Debug, Serialize, serde::Deserialize)]
311pub struct ExecutionEnvironmentSync {
312 pub system_prompt: Arc<str>,
313 pub tool_specs: Arc<Vec<LlmToolSpec>>,
314}
315
316pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
317 pub request: Arc<LlmRequest>,
318 driver_state: Option<M::DriverState>,
319}
320
321impl<M: TurnProtocol> WaitingLlmState<M> {
322 pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
323 self.driver_state.take()
324 }
325}
326
327pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
328 driver_state: M::DriverState,
329}
330
331impl<M: TurnProtocol> WaitingExecState<M> {
332 pub fn into_driver_state(self) -> M::DriverState {
333 self.driver_state
334 }
335}
336
337#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
338pub enum CheckpointResumeAction {
339 PrepareIteration,
340 Finish(TurnOutcome),
341}
342
343#[allow(clippy::large_enum_variant)]
344pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
345 Emit(SessionEvent),
346 AppendEvents(Vec<SessionEventRecord<M::Event>>),
347 StartLlm {
348 request: Arc<LlmRequest>,
349 driver_state: Option<M::DriverState>,
350 },
351 StartTools {
352 calls: Vec<PendingToolCall>,
353 },
354 StartExec {
355 code: String,
356 driver_state: M::DriverState,
357 },
358 StartCheckpoint {
359 checkpoint: CheckpointKind,
360 on_empty: CheckpointResumeAction,
361 },
362 AdvanceProtocolIteration,
363 ScheduleTurnLimitFinal {
364 message: Message,
365 },
366 Finish(TurnOutcome),
367}
368
369pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
370 config: &'a TurnMachineConfig<M>,
371 messages: &'a MessageSequence,
372 events: &'a [SessionEventRecord<M::Event>],
373 turn_causes: &'a [TurnCause],
374 protocol_iteration: usize,
375 protocol_run_offset: usize,
376 termination: &'a TurnTerminationPolicyState,
377}
378
379impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
380 pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
381 self.config.projector.project(ProjectorContext {
382 config: self.config,
383 messages: self.messages,
384 events: self.events,
385 turn_causes: self.turn_causes,
386 protocol_iteration: self.protocol_iteration,
387 use_tools,
388 })
389 }
390
391 pub fn protocol_iteration(&self) -> usize {
392 self.protocol_iteration
393 }
394
395 pub fn protocol_run_offset(&self) -> usize {
396 self.protocol_run_offset
397 }
398
399 pub fn max_turns(&self) -> Option<usize> {
400 self.config.max_turns
401 }
402
403 pub fn termination(&self) -> &M::Termination {
404 &self.config.termination
405 }
406
407 pub fn autonomous(&self) -> bool {
408 self.config.autonomous
409 }
410
411 pub fn should_force_exit_after_grace_turn(&self) -> bool {
412 self.termination.should_force_exit_after_grace_turn()
413 }
414
415 pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
416 self.termination.turn_limit_final_to_schedule(
417 self.protocol_iteration,
418 self.protocol_run_offset,
419 self.config.max_turns,
420 )
421 }
422
423 pub fn messages(&self) -> &MessageSequence {
424 self.messages
425 }
426
427 pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
428 self.events
429 }
430
431 pub fn turn_causes(&self) -> &[TurnCause] {
432 self.turn_causes
433 }
434}
435
436pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
437 pub config: &'a TurnMachineConfig<M>,
438 pub messages: &'a MessageSequence,
439 pub events: &'a [SessionEventRecord<M::Event>],
440 pub turn_causes: &'a [TurnCause],
441 pub protocol_iteration: usize,
442 pub use_tools: bool,
443}
444
445pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
446 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
447}
448
449#[derive(Clone, Debug, Default)]
450pub struct ChatContextProjector;
451
452impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
453 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
454 let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
455 let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
456 let mut messages = rendered_prompt.messages;
457 if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
458 messages.push(crate::llm::types::LlmMessage::text(
459 crate::llm::types::LlmRole::User,
460 Arc::from(turn_events),
461 ));
462 }
463 if !ctx.config.system_prompt.trim().is_empty() {
464 messages.insert(
465 0,
466 crate::llm::types::LlmMessage::text(
467 crate::llm::types::LlmRole::System,
468 Arc::clone(&ctx.config.system_prompt),
469 ),
470 );
471 }
472
473 Arc::new(LlmRequest {
474 model: ctx.config.model.clone(),
475 messages,
476 attachments,
477 tools: if ctx.use_tools {
478 Arc::clone(&ctx.config.tool_specs)
479 } else {
480 Arc::new(Vec::new())
481 },
482 tool_choice: if ctx.use_tools {
483 LlmToolChoice::Auto
484 } else {
485 LlmToolChoice::None
486 },
487 model_variant: ctx.config.model_variant.clone(),
488 generation: ctx.config.generation.clone(),
489 session_id: ctx.config.run_session_id.clone(),
490 output_spec: None,
491 stream_events: None,
492 provider_trace: None,
493 })
494 }
495}
496
497fn render_messages_for_projector(
498 messages: &MessageSequence,
499 turn_causes: &[TurnCause],
500) -> crate::RenderedPrompt {
501 if turn_causes.is_empty() {
502 return messages.render_prompt();
503 }
504
505 let active_cause_ids = turn_causes
506 .iter()
507 .map(|cause| cause.id.as_str())
508 .collect::<HashSet<_>>();
509 let filtered = messages
510 .iter()
511 .filter(|message| {
512 !(matches!(message.role, MessageRole::Event)
513 && active_cause_ids.contains(message.id.as_str()))
514 })
515 .cloned()
516 .collect::<Vec<_>>();
517 render_prompt(filtered.as_slice())
518}
519
520pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
521 fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
522 fn handle_llm_success(
523 &self,
524 ctx: DriverContextView<'_, M>,
525 waiting: WaitingLlmState<M>,
526 llm_response: LlmResponse,
527 text_streamed: bool,
528 ) -> Vec<DriverAction<M>>;
529 fn handle_tool_results(
530 &self,
531 ctx: DriverContextView<'_, M>,
532 completed: Vec<CompletedToolCall>,
533 ) -> Vec<DriverAction<M>>;
534 fn handle_exec_result(
535 &self,
536 ctx: DriverContextView<'_, M>,
537 waiting: WaitingExecState<M>,
538 result: Result<crate::ExecResponse, String>,
539 ) -> Vec<DriverAction<M>>;
540}
541
542pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
544 pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
545 pub projector: Arc<dyn ContextProjector<M>>,
546 pub sync_execution_environment: bool,
547 pub model: String,
548 pub max_context_tokens: Option<usize>,
553 pub max_turns: Option<usize>,
554 pub model_variant: Option<String>,
555 pub generation: crate::llm::types::GenerationOptions,
556 pub run_session_id: Option<String>,
557 pub autonomous: bool,
558 pub tool_specs: Arc<Vec<LlmToolSpec>>,
559 pub system_prompt: Arc<str>,
560 pub session_id: String,
561 pub emit_llm_trace: bool,
562 pub termination: M::Termination,
563 pub turn_limit_final_message: crate::TurnLimitFinalMessage,
564}
565
566