1impl TurnProtocol for UnitTurnProtocol {
2 type Event = ();
3 type Termination = ();
4 type DriverState = serde_json::Value;
5}
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
9pub struct EffectId(pub u64);
10
11#[derive(Clone, Debug, Serialize, serde::Deserialize)]
12pub struct PendingToolCall {
13 pub call_id: String,
14 pub tool_name: String,
15 pub args: Value,
16 pub replay: Option<ProviderReplayMeta>,
18}
19
20#[derive(Clone, Debug, Serialize, serde::Deserialize)]
21pub struct CompletedToolCall {
22 pub call_id: String,
23 pub tool_name: String,
24 pub args: Value,
25 pub output: ToolCallOutput,
26 pub model_return: ModelToolReturn,
27 pub duration_ms: u64,
28 pub replay: Option<ProviderReplayMeta>,
30}
31
32#[derive(Clone, Debug, PartialEq, Eq, Serialize, serde::Deserialize)]
33pub struct TurnCause {
34 pub id: String,
35 pub event_type: String,
36 pub origin: MessageOrigin,
37 pub text: String,
38}
39
40impl TurnCause {
41 pub fn to_event_message(&self) -> Message {
42 Message {
43 id: self.id.clone(),
44 role: MessageRole::Event,
45 parts: Arc::new(vec![Part {
46 id: format!("{}.p0", self.id),
47 kind: PartKind::Text,
48 content: self.text.clone(),
49 attachment: None,
50 tool_call_id: None,
51 tool_name: None,
52 tool_replay: None,
53 prune_state: PruneState::Intact,
54 reasoning_meta: None,
55 response_meta: None,
56 }]),
57 origin: Some(self.origin.clone()),
58 }
59 }
60}
61
62#[derive(Clone, Debug, Default, Serialize, serde::Deserialize)]
63pub struct CheckpointDelivery {
64 pub messages: Vec<PluginMessage>,
65 pub transient_messages: Vec<PluginMessage>,
66 pub turn_causes: Vec<TurnCause>,
67}
68
69pub fn render_turn_causes_prompt(causes: &[TurnCause]) -> Option<String> {
70 if causes.is_empty() {
71 return None;
72 }
73
74 let mut rendered = String::from("=== TURN EVENTS ===");
75 for (index, cause) in causes.iter().enumerate() {
76 rendered.push_str("\n\n");
77 rendered.push_str(&format!(
78 "--- event[{index}] · {} · {} ---\n",
79 cause.event_type, cause.id
80 ));
81 rendered.push_str("Origin: ");
82 rendered.push_str(&render_message_origin(&cause.origin));
83 rendered.push_str("\n\n");
84 rendered.push_str(cause.text.trim());
85 }
86 Some(rendered)
87}
88
89fn render_message_origin(origin: &MessageOrigin) -> String {
90 match origin {
91 MessageOrigin::Plugin {
92 plugin_id,
93 transient,
94 } => {
95 if *transient {
96 format!("plugin {plugin_id} (transient)")
97 } else {
98 format!("plugin {plugin_id}")
99 }
100 }
101 MessageOrigin::Process {
102 process_id,
103 event_type,
104 sequence,
105 wake_id,
106 ..
107 } => match wake_id {
108 Some(wake_id) => {
109 format!("process {process_id} {event_type} #{sequence} ({wake_id})")
110 }
111 None => format!("process {process_id} {event_type} #{sequence}"),
112 },
113 }
114}
115
116#[derive(Clone, Debug, Serialize, serde::Deserialize)]
117pub enum LogEvent {
118 LlmDebug {
119 session_id: String,
120 protocol_iteration: usize,
121 usage: TokenUsage,
122 provider_usage: Option<Value>,
123 request_body: Option<String>,
124 response_text: String,
125 response_parts: Option<Value>,
126 },
127 LlmError {
128 session_id: String,
129 protocol_iteration: usize,
130 request_body: Option<String>,
131 message: String,
132 retryable: bool,
133 raw: Option<String>,
134 code: Option<String>,
135 terminal_reason: LlmTerminalReason,
136 },
137}
138
139#[derive(Debug, Serialize, serde::Deserialize)]
146#[allow(clippy::large_enum_variant)]
147pub enum Effect<M: TurnProtocol = UnitTurnProtocol> {
148 SyncExecutionEnvironment {
156 id: EffectId,
157 update_machine_config: bool,
158 },
159 LlmCall {
161 id: EffectId,
162 request: Arc<LlmRequest>,
163 },
164 CancelLlm { id: EffectId },
166 ToolCalls {
168 id: EffectId,
169 calls: Vec<PendingToolCall>,
170 },
171 ExecCode {
173 id: EffectId,
174 language: String,
175 code: String,
176 },
177 Checkpoint {
179 id: EffectId,
180 checkpoint: CheckpointKind,
181 },
182 Log { event: LogEvent },
184 Emit(SessionEvent),
186 Progress {
192 messages: MessageSequence,
193 event_delta: Vec<SessionEventRecord<M::Event>>,
194 protocol_iteration: usize,
195 },
196 Done {
198 messages: MessageSequence,
199 event_delta: Vec<SessionEventRecord<M::Event>>,
200 protocol_iteration: usize,
201 },
202}
203
204impl<M: TurnProtocol> Clone for Effect<M> {
205 fn clone(&self) -> Self {
206 match self {
207 Self::SyncExecutionEnvironment {
208 id,
209 update_machine_config,
210 } => Self::SyncExecutionEnvironment {
211 id: *id,
212 update_machine_config: *update_machine_config,
213 },
214 Self::LlmCall { id, request } => Self::LlmCall {
215 id: *id,
216 request: Arc::clone(request),
217 },
218 Self::CancelLlm { id } => Self::CancelLlm { id: *id },
219 Self::ToolCalls { id, calls } => Self::ToolCalls {
220 id: *id,
221 calls: calls.clone(),
222 },
223 Self::ExecCode { id, language, code } => Self::ExecCode {
224 id: *id,
225 language: language.clone(),
226 code: code.clone(),
227 },
228 Self::Checkpoint { id, checkpoint } => Self::Checkpoint {
229 id: *id,
230 checkpoint: *checkpoint,
231 },
232 Self::Log { event } => Self::Log {
233 event: event.clone(),
234 },
235 Self::Emit(event) => Self::Emit(event.clone()),
236 Self::Progress {
237 messages,
238 event_delta,
239 protocol_iteration,
240 } => Self::Progress {
241 messages: messages.clone(),
242 event_delta: event_delta.clone(),
243 protocol_iteration: *protocol_iteration,
244 },
245 Self::Done {
246 messages,
247 event_delta,
248 protocol_iteration,
249 } => Self::Done {
250 messages: messages.clone(),
251 event_delta: event_delta.clone(),
252 protocol_iteration: *protocol_iteration,
253 },
254 }
255 }
256}
257
258impl<M: TurnProtocol> Effect<M> {
259 fn id(&self) -> Option<EffectId> {
260 match self {
261 Self::SyncExecutionEnvironment { id, .. }
262 | Self::LlmCall { id, .. }
263 | Self::CancelLlm { id }
264 | Self::ToolCalls { id, .. }
265 | Self::ExecCode { id, .. }
266 | Self::Checkpoint { id, .. } => Some(*id),
267 Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
268 }
269 }
270}
271
272#[derive(Clone, Debug, Serialize, serde::Deserialize)]
274pub struct LlmCallError {
275 pub message: String,
276 pub retryable: bool,
277 pub raw: Option<String>,
278 pub code: Option<String>,
279 pub terminal_reason: LlmTerminalReason,
280 pub request_body: Option<String>,
281}
282
283pub enum Response {
285 ExecutionEnvironmentSynced {
287 id: EffectId,
288 result: Result<Option<ExecutionEnvironmentSync>, String>,
289 },
290 LlmComplete {
292 id: EffectId,
293 result: Result<LlmResponse, LlmCallError>,
294 text_streamed: bool,
297 },
298 ToolResults {
300 id: EffectId,
301 results: Vec<CompletedToolCall>,
302 },
303 ExecResult {
305 id: EffectId,
306 result: Result<crate::ExecResponse, String>,
307 },
308 Checkpoint {
310 id: EffectId,
311 delivery: CheckpointDelivery,
312 },
313}
314
315#[derive(Clone, Debug, Serialize, serde::Deserialize)]
316pub struct ExecutionEnvironmentSync {
317 pub system_prompt: Arc<str>,
318 pub tool_specs: Arc<Vec<LlmToolSpec>>,
319}
320
321pub struct WaitingLlmState<M: TurnProtocol = UnitTurnProtocol> {
322 pub request: Arc<LlmRequest>,
323 driver_state: Option<M::DriverState>,
324}
325
326impl<M: TurnProtocol> WaitingLlmState<M> {
327 pub fn take_driver_state(&mut self) -> Option<M::DriverState> {
328 self.driver_state.take()
329 }
330}
331
332pub struct WaitingExecState<M: TurnProtocol = UnitTurnProtocol> {
333 driver_state: M::DriverState,
334}
335
336impl<M: TurnProtocol> WaitingExecState<M> {
337 pub fn into_driver_state(self) -> M::DriverState {
338 self.driver_state
339 }
340}
341
342#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
343pub enum CheckpointResumeAction {
344 PrepareIteration,
345 Finish(TurnOutcome),
346}
347
348#[allow(clippy::large_enum_variant)]
349pub enum DriverAction<M: TurnProtocol = UnitTurnProtocol> {
350 Emit(SessionEvent),
351 AppendEvents(Vec<SessionEventRecord<M::Event>>),
352 StartLlm {
353 request: Arc<LlmRequest>,
354 driver_state: Option<M::DriverState>,
355 },
356 StartTools {
357 calls: Vec<PendingToolCall>,
358 },
359 StartExec {
360 language: String,
361 code: String,
362 driver_state: M::DriverState,
363 },
364 StartCheckpoint {
365 checkpoint: CheckpointKind,
366 on_empty: CheckpointResumeAction,
367 },
368 AdvanceProtocolIteration,
369 ScheduleTurnLimitFinal {
370 message: Message,
371 },
372 Finish(TurnOutcome),
373}
374
375pub struct DriverContextView<'a, M: TurnProtocol = UnitTurnProtocol> {
376 config: &'a TurnMachineConfig<M>,
377 messages: &'a MessageSequence,
378 events: &'a [SessionEventRecord<M::Event>],
379 turn_causes: &'a [TurnCause],
380 protocol_iteration: usize,
381 protocol_run_offset: usize,
382 termination: &'a TurnTerminationPolicyState,
383}
384
385impl<'a, M: TurnProtocol> DriverContextView<'a, M> {
386 pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
387 self.config.projector.project(ProjectorContext {
388 config: self.config,
389 messages: self.messages,
390 events: self.events,
391 turn_causes: self.turn_causes,
392 protocol_iteration: self.protocol_iteration,
393 use_tools,
394 })
395 }
396
397 pub fn protocol_iteration(&self) -> usize {
398 self.protocol_iteration
399 }
400
401 pub fn protocol_run_offset(&self) -> usize {
402 self.protocol_run_offset
403 }
404
405 pub fn max_turns(&self) -> Option<usize> {
406 self.config.max_turns
407 }
408
409 pub fn termination(&self) -> &M::Termination {
410 &self.config.termination
411 }
412
413 pub fn autonomous(&self) -> bool {
414 self.config.autonomous
415 }
416
417 pub fn should_force_exit_after_grace_turn(&self) -> bool {
418 self.termination.should_force_exit_after_grace_turn()
419 }
420
421 pub fn turn_limit_final_to_schedule(&self) -> Option<usize> {
422 self.termination.turn_limit_final_to_schedule(
423 self.protocol_iteration,
424 self.protocol_run_offset,
425 self.config.max_turns,
426 )
427 }
428
429 pub fn messages(&self) -> &MessageSequence {
430 self.messages
431 }
432
433 pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
434 self.events
435 }
436
437 pub fn turn_causes(&self) -> &[TurnCause] {
438 self.turn_causes
439 }
440}
441
442pub struct ProjectorContext<'a, M: TurnProtocol = UnitTurnProtocol> {
443 pub config: &'a TurnMachineConfig<M>,
444 pub messages: &'a MessageSequence,
445 pub events: &'a [SessionEventRecord<M::Event>],
446 pub turn_causes: &'a [TurnCause],
447 pub protocol_iteration: usize,
448 pub use_tools: bool,
449}
450
451pub trait ContextProjector<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
452 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
453}
454
455#[derive(Clone, Debug, Default)]
456pub struct ChatContextProjector;
457
458impl<M: TurnProtocol> ContextProjector<M> for ChatContextProjector {
459 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
460 let rendered_prompt = render_messages_for_projector(ctx.messages, ctx.turn_causes);
461 let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
462 let mut messages = rendered_prompt.messages;
463 if let Some(turn_events) = render_turn_causes_prompt(ctx.turn_causes) {
464 messages.push(crate::llm::types::LlmMessage::text(
465 crate::llm::types::LlmRole::User,
466 Arc::from(turn_events),
467 ));
468 }
469 if !ctx.config.system_prompt.trim().is_empty() {
470 messages.insert(
471 0,
472 crate::llm::types::LlmMessage::text(
473 crate::llm::types::LlmRole::System,
474 Arc::clone(&ctx.config.system_prompt),
475 ),
476 );
477 }
478
479 Arc::new(LlmRequest {
480 model: ctx.config.model.clone(),
481 messages,
482 attachments,
483 tools: if ctx.use_tools {
484 Arc::clone(&ctx.config.tool_specs)
485 } else {
486 Arc::new(Vec::new())
487 },
488 tool_choice: if ctx.use_tools {
489 LlmToolChoice::Auto
490 } else {
491 LlmToolChoice::None
492 },
493 model_variant: ctx.config.model_variant.clone(),
494 generation: ctx.config.generation.clone(),
495 session_id: ctx.config.run_session_id.clone(),
496 output_spec: None,
497 stream_events: None,
498 provider_trace: None,
499 })
500 }
501}
502
503fn render_messages_for_projector(
504 messages: &MessageSequence,
505 turn_causes: &[TurnCause],
506) -> crate::RenderedPrompt {
507 if turn_causes.is_empty() {
508 return messages.render_prompt();
509 }
510
511 let active_cause_ids = turn_causes
512 .iter()
513 .map(|cause| cause.id.as_str())
514 .collect::<HashSet<_>>();
515 let filtered = messages
516 .iter()
517 .filter(|message| {
518 !(matches!(message.role, MessageRole::Event)
519 && active_cause_ids.contains(message.id.as_str()))
520 })
521 .cloned()
522 .collect::<Vec<_>>();
523 render_prompt(filtered.as_slice())
524}
525
526pub trait ProtocolDriverHandle<M: TurnProtocol = UnitTurnProtocol>: Send + Sync {
527 fn prepare_protocol_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
528 fn handle_llm_success(
529 &self,
530 ctx: DriverContextView<'_, M>,
531 waiting: WaitingLlmState<M>,
532 llm_response: LlmResponse,
533 text_streamed: bool,
534 ) -> Vec<DriverAction<M>>;
535 fn handle_tool_results(
536 &self,
537 ctx: DriverContextView<'_, M>,
538 completed: Vec<CompletedToolCall>,
539 ) -> Vec<DriverAction<M>>;
540 fn handle_exec_result(
541 &self,
542 ctx: DriverContextView<'_, M>,
543 waiting: WaitingExecState<M>,
544 result: Result<crate::ExecResponse, String>,
545 ) -> Vec<DriverAction<M>>;
546}
547
548pub struct TurnMachineConfig<M: TurnProtocol = UnitTurnProtocol> {
550 pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
551 pub projector: Arc<dyn ContextProjector<M>>,
552 pub sync_execution_environment: bool,
553 pub model: String,
554 pub max_context_tokens: Option<usize>,
559 pub max_turns: Option<usize>,
560 pub model_variant: Option<String>,
561 pub generation: crate::llm::types::GenerationOptions,
562 pub run_session_id: Option<String>,
563 pub autonomous: bool,
564 pub tool_specs: Arc<Vec<LlmToolSpec>>,
565 pub system_prompt: Arc<str>,
566 pub session_id: String,
567 pub emit_llm_trace: bool,
568 pub termination: M::Termination,
569 pub turn_limit_final_message: crate::TurnLimitFinalMessage,
570}
571
572