1use std::collections::VecDeque;
8use std::fmt::Debug;
9use std::sync::Arc;
10use std::time::Duration;
11
12use serde::Serialize;
13use serde::de::DeserializeOwned;
14use serde_json::Value;
15
16use crate::llm::types::{
17 LlmAttachment, LlmOutputPart, LlmRequest, LlmResponse, LlmTerminalReason, LlmToolChoice,
18 LlmToolSpec, ProviderReplayMeta,
19};
20use crate::session_model::message::MessageOrigin;
21use crate::session_model::{
22 Message, MessageRole, MessageSequence, Part, PartKind, PruneState, SessionEvent,
23 SessionEventRecord, TokenUsage, ToolEvent, TurnTerminationPolicyState, fresh_message_id,
24 make_error_event, reassign_part_ids,
25};
26use crate::{
27 CheckpointKind, ModelToolReturn, PluginMessage, ToolCallOutput, TurnOutcome, TurnStop,
28};
29
30pub trait ModeProtocol: Send + Sync + 'static {
33 type Event: Clone + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
34 type Termination: Clone + Default + Debug + Send + Sync + 'static;
35 type DriverState: Clone + Default + Serialize + DeserializeOwned + Debug + Send + Sync + 'static;
36}
37
38#[derive(Clone, Debug, Serialize, serde::Deserialize)]
39pub struct UnitModeProtocol;
40
41impl ModeProtocol for UnitModeProtocol {
42 type Event = ();
43 type Termination = ();
44 type DriverState = serde_json::Value;
45}
46
47#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, serde::Deserialize)]
49pub struct EffectId(pub u64);
50
51#[derive(Clone, Debug, Serialize, serde::Deserialize)]
52pub struct PendingToolCall {
53 pub call_id: String,
54 pub tool_name: String,
55 pub args: Value,
56 pub replay: Option<ProviderReplayMeta>,
58}
59
60#[derive(Clone, Debug, Serialize, serde::Deserialize)]
61pub struct CompletedToolCall {
62 pub call_id: String,
63 pub tool_name: String,
64 pub args: Value,
65 pub output: ToolCallOutput,
66 pub model_return: ModelToolReturn,
67 pub duration_ms: u64,
68 pub replay: Option<ProviderReplayMeta>,
70}
71
72#[derive(Clone, Debug, Serialize, serde::Deserialize)]
73pub enum LogEvent {
74 LlmDebug {
75 session_id: String,
76 mode_iteration: usize,
77 usage: TokenUsage,
78 provider_usage: Option<Value>,
79 request_body: Option<String>,
80 response_text: String,
81 response_parts: Option<Value>,
82 },
83 LlmError {
84 session_id: String,
85 mode_iteration: usize,
86 request_body: Option<String>,
87 message: String,
88 retryable: bool,
89 raw: Option<String>,
90 code: Option<String>,
91 terminal_reason: LlmTerminalReason,
92 },
93}
94
95#[derive(Debug)]
97#[allow(clippy::large_enum_variant)]
98pub enum Effect<M: ModeProtocol = UnitModeProtocol> {
99 SyncExecutionSurface {
107 id: EffectId,
108 update_machine_config: bool,
109 },
110 LlmCall {
112 id: EffectId,
113 request: Arc<LlmRequest>,
114 },
115 CancelLlm { id: EffectId },
117 ToolCalls {
119 id: EffectId,
120 calls: Vec<PendingToolCall>,
121 },
122 ExecCode { id: EffectId, code: String },
124 Checkpoint {
126 id: EffectId,
127 checkpoint: CheckpointKind,
128 },
129 Sleep { id: EffectId, duration: Duration },
131 Log { event: LogEvent },
133 Emit(SessionEvent),
135 Progress {
141 messages: MessageSequence,
142 events: Arc<Vec<SessionEventRecord<M::Event>>>,
143 mode_iteration: usize,
144 },
145 Done {
147 messages: MessageSequence,
148 events: Arc<Vec<SessionEventRecord<M::Event>>>,
149 mode_iteration: usize,
150 },
151}
152
153impl<M: ModeProtocol> Effect<M> {
154 fn id(&self) -> Option<EffectId> {
155 match self {
156 Self::SyncExecutionSurface { id, .. }
157 | Self::LlmCall { id, .. }
158 | Self::CancelLlm { id }
159 | Self::ToolCalls { id, .. }
160 | Self::ExecCode { id, .. }
161 | Self::Checkpoint { id, .. }
162 | Self::Sleep { id, .. } => Some(*id),
163 Self::Log { .. } | Self::Emit(_) | Self::Progress { .. } | Self::Done { .. } => None,
164 }
165 }
166}
167
168#[derive(Clone, Debug, Serialize, serde::Deserialize)]
170#[allow(clippy::large_enum_variant)]
171pub enum CheckpointEffect<M: ModeProtocol = UnitModeProtocol> {
172 SyncExecutionSurface {
173 id: EffectId,
174 update_machine_config: bool,
175 },
176 LlmCall {
177 id: EffectId,
178 request: Arc<LlmRequest>,
179 },
180 CancelLlm {
181 id: EffectId,
182 },
183 ToolCalls {
184 id: EffectId,
185 calls: Vec<PendingToolCall>,
186 },
187 ExecCode {
188 id: EffectId,
189 code: String,
190 },
191 Checkpoint {
192 id: EffectId,
193 checkpoint: CheckpointKind,
194 },
195 Sleep {
196 id: EffectId,
197 duration: Duration,
198 },
199 Log {
200 event: LogEvent,
201 },
202 Emit(SessionEvent),
203 Progress {
204 messages: Vec<Message>,
205 events: Vec<SessionEventRecord<M::Event>>,
206 mode_iteration: usize,
207 },
208 Done {
209 messages: Vec<Message>,
210 events: Vec<SessionEventRecord<M::Event>>,
211 mode_iteration: usize,
212 },
213}
214
215#[derive(Clone, Debug, Serialize, serde::Deserialize)]
216pub struct LlmCallError {
217 pub message: String,
218 pub retryable: bool,
219 pub raw: Option<String>,
220 pub code: Option<String>,
221 pub terminal_reason: LlmTerminalReason,
222 pub request_body: Option<String>,
223}
224
225pub enum Response {
227 ExecutionSurfaceSynced {
229 id: EffectId,
230 result: Result<Option<ExecutionSurfaceSync>, String>,
231 },
232 LlmComplete {
234 id: EffectId,
235 result: Result<LlmResponse, LlmCallError>,
236 text_streamed: bool,
239 },
240 ToolResults {
242 id: EffectId,
243 results: Vec<CompletedToolCall>,
244 },
245 ExecResult {
247 id: EffectId,
248 result: Result<crate::ExecResponse, String>,
249 },
250 Checkpoint {
252 id: EffectId,
253 messages: Vec<PluginMessage>,
254 transient_messages: Vec<PluginMessage>,
255 },
256 Timeout { id: EffectId },
258}
259
260#[derive(Clone, Serialize, serde::Deserialize)]
261pub struct ExecutionSurfaceSync {
262 pub system_prompt: Arc<str>,
263 pub tool_specs: Arc<Vec<LlmToolSpec>>,
264}
265
266pub fn driver_state<T>(state: T) -> serde_json::Value
267where
268 T: Serialize,
269{
270 serde_json::to_value(state).expect("driver state must serialize")
271}
272
273pub struct WaitingLlmState<M: ModeProtocol = UnitModeProtocol> {
274 pub request: Arc<LlmRequest>,
275 driver_state: Option<M::DriverState>,
276}
277
278impl<M: ModeProtocol> WaitingLlmState<M> {
279 pub fn take_driver_state<T>(&mut self) -> Option<T>
280 where
281 T: DeserializeOwned,
282 {
283 self.driver_state
284 .take()
285 .and_then(|state| serde_json::to_value(state).ok())
286 .and_then(|state| serde_json::from_value(state).ok())
287 }
288}
289
290pub struct WaitingExecState<M: ModeProtocol = UnitModeProtocol> {
291 driver_state: M::DriverState,
292}
293
294impl<M: ModeProtocol> WaitingExecState<M> {
295 pub fn into_driver_state<T>(self) -> Option<T>
296 where
297 T: DeserializeOwned,
298 {
299 serde_json::to_value(self.driver_state)
300 .ok()
301 .and_then(|state| serde_json::from_value(state).ok())
302 }
303}
304
305#[derive(Clone, Debug, PartialEq, Serialize, serde::Deserialize)]
306pub enum CheckpointResumeAction {
307 PrepareIteration,
308 Finish(TurnOutcome),
309}
310
311#[allow(clippy::large_enum_variant)]
312pub enum DriverAction<M: ModeProtocol = UnitModeProtocol> {
313 Emit(SessionEvent),
314 AppendEvents(Vec<SessionEventRecord<M::Event>>),
315 StartLlm {
316 request: Arc<LlmRequest>,
317 driver_state: Option<M::DriverState>,
318 },
319 StartTools {
320 calls: Vec<PendingToolCall>,
321 },
322 StartExec {
323 code: String,
324 driver_state: M::DriverState,
325 },
326 StartCheckpoint {
327 checkpoint: CheckpointKind,
328 on_empty: CheckpointResumeAction,
329 },
330 AdvanceModeIteration,
331 ScheduleTurnLimitFinal,
332 Finish(TurnOutcome),
333}
334
335pub struct DriverContextView<'a, M: ModeProtocol = UnitModeProtocol> {
336 config: &'a TurnMachineConfig<M>,
337 messages: &'a MessageSequence,
338 events: &'a [SessionEventRecord<M::Event>],
339 mode_iteration: usize,
340 mode_run_offset: usize,
341 termination: &'a TurnTerminationPolicyState,
342}
343
344impl<'a, M: ModeProtocol> DriverContextView<'a, M> {
345 pub fn project_llm_request(&self, use_tools: bool) -> Arc<LlmRequest> {
346 self.config.projector.project(ProjectorContext {
347 config: self.config,
348 messages: self.messages,
349 events: self.events,
350 mode_iteration: self.mode_iteration,
351 use_tools,
352 })
353 }
354
355 pub fn mode_iteration(&self) -> usize {
356 self.mode_iteration
357 }
358
359 pub fn mode_run_offset(&self) -> usize {
360 self.mode_run_offset
361 }
362
363 pub fn max_turns(&self) -> Option<usize> {
364 self.config.max_turns
365 }
366
367 pub fn termination(&self) -> &M::Termination {
368 &self.config.termination
369 }
370
371 pub fn autonomous(&self) -> bool {
372 self.config.autonomous
373 }
374
375 pub fn should_force_exit_after_grace_turn(&self) -> bool {
376 self.termination.should_force_exit_after_grace_turn()
377 }
378
379 pub fn messages(&self) -> &MessageSequence {
380 self.messages
381 }
382
383 pub fn events(&self) -> &[SessionEventRecord<M::Event>] {
384 self.events
385 }
386}
387
388pub struct ProjectorContext<'a, M: ModeProtocol = UnitModeProtocol> {
389 pub config: &'a TurnMachineConfig<M>,
390 pub messages: &'a MessageSequence,
391 pub events: &'a [SessionEventRecord<M::Event>],
392 pub mode_iteration: usize,
393 pub use_tools: bool,
394}
395
396pub trait ContextProjector<M: ModeProtocol = UnitModeProtocol>: Send + Sync {
397 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest>;
398}
399
400#[derive(Clone, Debug, Default)]
401pub struct ChatContextProjector;
402
403impl<M: ModeProtocol> ContextProjector<M> for ChatContextProjector {
404 fn project(&self, ctx: ProjectorContext<'_, M>) -> Arc<LlmRequest> {
405 let rendered_prompt = ctx.messages.render_prompt();
406 let attachments: Vec<LlmAttachment> = rendered_prompt.attachments;
407 let mut messages = rendered_prompt.messages;
408 if !ctx.config.system_prompt.trim().is_empty() {
409 messages.insert(
410 0,
411 crate::llm::types::LlmMessage::text(
412 crate::llm::types::LlmRole::System,
413 Arc::clone(&ctx.config.system_prompt),
414 ),
415 );
416 }
417
418 Arc::new(LlmRequest {
419 model: ctx.config.model.clone(),
420 messages,
421 attachments,
422 tools: if ctx.use_tools {
423 Arc::clone(&ctx.config.tool_specs)
424 } else {
425 Arc::new(Vec::new())
426 },
427 tool_choice: if ctx.use_tools {
428 LlmToolChoice::Auto
429 } else {
430 LlmToolChoice::None
431 },
432 model_variant: ctx.config.model_variant.clone(),
433 session_id: ctx.config.run_session_id.clone(),
434 output_spec: None,
435 stream_events: None,
436 provider_trace: None,
437 })
438 }
439}
440
441pub trait ProtocolDriverHandle<M: ModeProtocol = UnitModeProtocol>: Send + Sync {
442 fn prepare_mode_iteration(&self, ctx: DriverContextView<'_, M>) -> Vec<DriverAction<M>>;
443 fn handle_llm_success(
444 &self,
445 ctx: DriverContextView<'_, M>,
446 waiting: WaitingLlmState<M>,
447 llm_response: LlmResponse,
448 text_streamed: bool,
449 ) -> Vec<DriverAction<M>>;
450 fn handle_tool_results(
451 &self,
452 ctx: DriverContextView<'_, M>,
453 completed: Vec<CompletedToolCall>,
454 ) -> Vec<DriverAction<M>>;
455 fn handle_exec_result(
456 &self,
457 ctx: DriverContextView<'_, M>,
458 waiting: WaitingExecState<M>,
459 result: Result<crate::ExecResponse, String>,
460 ) -> Vec<DriverAction<M>>;
461}
462
463pub struct TurnMachineConfig<M: ModeProtocol = UnitModeProtocol> {
465 pub protocol_driver: Arc<dyn ProtocolDriverHandle<M>>,
466 pub projector: Arc<dyn ContextProjector<M>>,
467 pub sync_execution_surface: bool,
468 pub model: String,
469 pub max_turns: Option<usize>,
470 pub model_variant: Option<String>,
471 pub run_session_id: Option<String>,
472 pub autonomous: bool,
473 pub tool_specs: Arc<Vec<LlmToolSpec>>,
474 pub system_prompt: Arc<str>,
475 pub session_id: String,
476 pub emit_llm_trace: bool,
477 pub termination: M::Termination,
478}
479
480#[derive(Debug, Serialize, serde::Deserialize)]
483enum MachineState<M: ModeProtocol = UnitModeProtocol> {
484 PreparingMode,
485 WaitingExecutionSurface {
486 effect_id: EffectId,
487 update_machine_config: bool,
488 },
489 PrepareIteration,
490 WaitingLlm {
491 effect_id: EffectId,
492 request: Arc<LlmRequest>,
493 driver_state: Option<M::DriverState>,
494 },
495 WaitingTools {
496 effect_id: EffectId,
497 calls: Vec<PendingToolCall>,
498 },
499 WaitingExec {
500 effect_id: EffectId,
501 code: String,
502 driver_state: M::DriverState,
503 },
504 WaitingCheckpoint {
505 effect_id: EffectId,
506 checkpoint: CheckpointKind,
507 on_empty: CheckpointResumeAction,
508 },
509 Finished,
510}
511
512#[derive(Clone, Debug, Serialize, serde::Deserialize)]
513pub struct TurnCheckpoint<M: ModeProtocol = UnitModeProtocol> {
514 state: MachineState<M>,
515 pending_effects: Vec<CheckpointEffect<M>>,
516 next_effect_id: u64,
517 messages: Vec<Message>,
518 events: Vec<SessionEventRecord<M::Event>>,
519 mode_iteration: usize,
520 mode_run_offset: usize,
521 cumulative_usage: TokenUsage,
522 termination: TurnTerminationPolicyState,
523 synced_mode_iteration: Option<usize>,
524}
525
526impl<M: ModeProtocol> Clone for MachineState<M> {
527 fn clone(&self) -> Self {
528 match self {
529 Self::PreparingMode => Self::PreparingMode,
530 Self::WaitingExecutionSurface {
531 effect_id,
532 update_machine_config,
533 } => Self::WaitingExecutionSurface {
534 effect_id: *effect_id,
535 update_machine_config: *update_machine_config,
536 },
537 Self::PrepareIteration => Self::PrepareIteration,
538 Self::WaitingLlm {
539 effect_id,
540 request,
541 driver_state,
542 } => Self::WaitingLlm {
543 effect_id: *effect_id,
544 request: Arc::clone(request),
545 driver_state: driver_state.clone(),
546 },
547 Self::WaitingTools { effect_id, calls } => Self::WaitingTools {
548 effect_id: *effect_id,
549 calls: calls.clone(),
550 },
551 Self::WaitingExec {
552 effect_id,
553 code,
554 driver_state,
555 } => Self::WaitingExec {
556 effect_id: *effect_id,
557 code: code.clone(),
558 driver_state: driver_state.clone(),
559 },
560 Self::WaitingCheckpoint {
561 effect_id,
562 checkpoint,
563 on_empty,
564 } => Self::WaitingCheckpoint {
565 effect_id: *effect_id,
566 checkpoint: *checkpoint,
567 on_empty: on_empty.clone(),
568 },
569 Self::Finished => Self::Finished,
570 }
571 }
572}
573
574impl<M: ModeProtocol> CheckpointEffect<M> {
575 fn from_effect(effect: &Effect<M>) -> Self {
576 match effect {
577 Effect::SyncExecutionSurface {
578 id,
579 update_machine_config,
580 } => Self::SyncExecutionSurface {
581 id: *id,
582 update_machine_config: *update_machine_config,
583 },
584 Effect::LlmCall { id, request } => Self::LlmCall {
585 id: *id,
586 request: Arc::clone(request),
587 },
588 Effect::CancelLlm { id } => Self::CancelLlm { id: *id },
589 Effect::ToolCalls { id, calls } => Self::ToolCalls {
590 id: *id,
591 calls: calls.clone(),
592 },
593 Effect::ExecCode { id, code } => Self::ExecCode {
594 id: *id,
595 code: code.clone(),
596 },
597 Effect::Checkpoint { id, checkpoint } => Self::Checkpoint {
598 id: *id,
599 checkpoint: *checkpoint,
600 },
601 Effect::Sleep { id, duration } => Self::Sleep {
602 id: *id,
603 duration: *duration,
604 },
605 Effect::Log { event } => Self::Log {
606 event: event.clone(),
607 },
608 Effect::Emit(event) => Self::Emit(event.clone()),
609 Effect::Progress {
610 messages,
611 events,
612 mode_iteration,
613 } => Self::Progress {
614 messages: messages.iter().cloned().collect(),
615 events: events.as_ref().clone(),
616 mode_iteration: *mode_iteration,
617 },
618 Effect::Done {
619 messages,
620 events,
621 mode_iteration,
622 } => Self::Done {
623 messages: messages.iter().cloned().collect(),
624 events: events.as_ref().clone(),
625 mode_iteration: *mode_iteration,
626 },
627 }
628 }
629
630 fn into_effect(self) -> Effect<M> {
631 match self {
632 Self::SyncExecutionSurface {
633 id,
634 update_machine_config,
635 } => Effect::SyncExecutionSurface {
636 id,
637 update_machine_config,
638 },
639 Self::LlmCall { id, request } => Effect::LlmCall { id, request },
640 Self::CancelLlm { id } => Effect::CancelLlm { id },
641 Self::ToolCalls { id, calls } => Effect::ToolCalls { id, calls },
642 Self::ExecCode { id, code } => Effect::ExecCode { id, code },
643 Self::Checkpoint { id, checkpoint } => Effect::Checkpoint { id, checkpoint },
644 Self::Sleep { id, duration } => Effect::Sleep { id, duration },
645 Self::Log { event } => Effect::Log { event },
646 Self::Emit(event) => Effect::Emit(event),
647 Self::Progress {
648 messages,
649 events,
650 mode_iteration,
651 } => Effect::Progress {
652 messages: MessageSequence::from_owned(messages),
653 events: Arc::new(events),
654 mode_iteration,
655 },
656 Self::Done {
657 messages,
658 events,
659 mode_iteration,
660 } => Effect::Done {
661 messages: MessageSequence::from_owned(messages),
662 events: Arc::new(events),
663 mode_iteration,
664 },
665 }
666 }
667}
668
669impl<M: ModeProtocol> MachineState<M> {
670 fn outstanding_effect_id(&self) -> Option<EffectId> {
671 match self {
672 Self::WaitingExecutionSurface { effect_id, .. }
673 | Self::WaitingLlm { effect_id, .. }
674 | Self::WaitingTools { effect_id, .. }
675 | Self::WaitingExec { effect_id, .. }
676 | Self::WaitingCheckpoint { effect_id, .. } => Some(*effect_id),
677 Self::PreparingMode | Self::PrepareIteration | Self::Finished => None,
678 }
679 }
680
681 fn outstanding_effect(&self) -> Option<Effect<M>> {
682 match self {
683 Self::WaitingExecutionSurface {
684 effect_id,
685 update_machine_config,
686 } => Some(Effect::SyncExecutionSurface {
687 id: *effect_id,
688 update_machine_config: *update_machine_config,
689 }),
690 Self::WaitingLlm {
691 effect_id, request, ..
692 } => Some(Effect::LlmCall {
693 id: *effect_id,
694 request: Arc::clone(request),
695 }),
696 Self::WaitingTools { effect_id, calls } => Some(Effect::ToolCalls {
697 id: *effect_id,
698 calls: calls.clone(),
699 }),
700 Self::WaitingExec {
701 effect_id, code, ..
702 } => Some(Effect::ExecCode {
703 id: *effect_id,
704 code: code.clone(),
705 }),
706 Self::WaitingCheckpoint {
707 effect_id,
708 checkpoint,
709 ..
710 } => Some(Effect::Checkpoint {
711 id: *effect_id,
712 checkpoint: *checkpoint,
713 }),
714 Self::PreparingMode | Self::PrepareIteration | Self::Finished => None,
715 }
716 }
717}
718
719pub struct TurnMachine<M: ModeProtocol = UnitModeProtocol> {
721 config: TurnMachineConfig<M>,
722 state: MachineState<M>,
723 pending_effects: VecDeque<Effect<M>>,
724 active_effect_redelivery: bool,
725 next_effect_id: u64,
726 messages: MessageSequence,
727 events: Arc<Vec<SessionEventRecord<M::Event>>>,
728 mode_iteration: usize,
729 mode_run_offset: usize,
730 cumulative_usage: TokenUsage,
731 termination: TurnTerminationPolicyState,
732 synced_mode_iteration: Option<usize>,
733}
734
735impl<M: ModeProtocol> TurnMachine<M> {
736 pub fn new(
738 config: TurnMachineConfig<M>,
739 messages: Vec<Message>,
740 events: Arc<Vec<SessionEventRecord<M::Event>>>,
741 mode_run_offset: usize,
742 ) -> Self {
743 Self::new_shared(
744 config,
745 MessageSequence::from_owned(messages),
746 events,
747 mode_run_offset,
748 )
749 }
750
751 pub fn new_shared(
752 config: TurnMachineConfig<M>,
753 messages: MessageSequence,
754 events: Arc<Vec<SessionEventRecord<M::Event>>>,
755 mode_run_offset: usize,
756 ) -> Self {
757 Self {
758 config,
759 state: MachineState::PreparingMode,
760 pending_effects: VecDeque::new(),
761 active_effect_redelivery: false,
762 next_effect_id: 1,
763 messages,
764 events,
765 mode_iteration: mode_run_offset,
766 mode_run_offset,
767 cumulative_usage: TokenUsage::default(),
768 termination: TurnTerminationPolicyState::new(),
769 synced_mode_iteration: None,
770 }
771 }
772
773 pub fn is_done(&self) -> bool {
775 matches!(self.state, MachineState::Finished)
776 }
777
778 pub fn messages(&self) -> Arc<Vec<Message>> {
779 self.messages.shared()
780 }
781
782 pub fn events(&self) -> Arc<Vec<SessionEventRecord<M::Event>>> {
783 Arc::clone(&self.events)
784 }
785
786 pub fn message_sequence(&self) -> MessageSequence {
787 self.messages.clone()
788 }
789
790 pub fn mode_iteration(&self) -> usize {
791 self.mode_iteration
792 }
793
794 pub fn checkpoint(&self) -> TurnCheckpoint<M> {
795 let active_effect_id = self.state.outstanding_effect_id();
796 let pending_effects = self
797 .pending_effects
798 .iter()
799 .filter(|effect| active_effect_id.is_none_or(|id| effect.id() != Some(id)))
800 .map(CheckpointEffect::from_effect)
801 .collect::<Vec<_>>();
802 TurnCheckpoint {
803 state: self.state.clone(),
804 pending_effects,
805 next_effect_id: self.next_effect_id,
806 messages: self.messages.iter().cloned().collect(),
807 events: self.events.as_ref().clone(),
808 mode_iteration: self.mode_iteration,
809 mode_run_offset: self.mode_run_offset,
810 cumulative_usage: self.cumulative_usage.clone(),
811 termination: self.termination.clone(),
812 synced_mode_iteration: self.synced_mode_iteration,
813 }
814 }
815
816 pub fn restore_from_checkpoint(
817 config: TurnMachineConfig<M>,
818 checkpoint: TurnCheckpoint<M>,
819 ) -> Self {
820 let active_effect_id = checkpoint.state.outstanding_effect_id();
821 let pending_effects = checkpoint
822 .pending_effects
823 .into_iter()
824 .map(CheckpointEffect::into_effect)
825 .collect::<VecDeque<_>>();
826 let active_effect_redelivery = active_effect_id.is_some()
827 && !pending_effects
828 .iter()
829 .any(|effect| effect.id() == active_effect_id);
830 Self {
831 config,
832 state: checkpoint.state,
833 pending_effects,
834 active_effect_redelivery,
835 next_effect_id: checkpoint.next_effect_id,
836 messages: MessageSequence::from_owned(checkpoint.messages),
837 events: Arc::new(checkpoint.events),
838 mode_iteration: checkpoint.mode_iteration,
839 mode_run_offset: checkpoint.mode_run_offset,
840 cumulative_usage: checkpoint.cumulative_usage,
841 termination: checkpoint.termination,
842 synced_mode_iteration: checkpoint.synced_mode_iteration,
843 }
844 }
845
846 fn driver_context(&self) -> DriverContextView<'_, M> {
847 DriverContextView {
848 config: &self.config,
849 messages: &self.messages,
850 events: self.events.as_slice(),
851 mode_iteration: self.mode_iteration,
852 mode_run_offset: self.mode_run_offset,
853 termination: &self.termination,
854 }
855 }
856
857 fn next_id(&mut self) -> EffectId {
858 let id = EffectId(self.next_effect_id);
859 self.next_effect_id += 1;
860 id
861 }
862
863 fn emit(&mut self, event: SessionEvent) {
864 self.pending_effects.push_back(Effect::Emit(event));
865 }
866
867 fn emit_progress(&mut self) {
868 self.pending_effects.push_back(Effect::Progress {
869 messages: self.messages.clone(),
870 events: Arc::clone(&self.events),
871 mode_iteration: self.mode_iteration,
872 });
873 }
874
875 pub fn fail_turn(&mut self, event: SessionEvent) {
876 self.emit(event);
877 self.finish(TurnOutcome::Stopped(TurnStop::RuntimeError));
878 }
879
880 pub fn finish_with_outcome(&mut self, outcome: TurnOutcome) {
881 self.finish(outcome);
882 }
883
884 fn finish(&mut self, outcome: TurnOutcome) {
885 self.emit(SessionEvent::TurnOutcome { outcome });
886 self.emit(SessionEvent::Done);
887 let msgs = std::mem::take(&mut self.messages);
888 let events = Arc::clone(&self.events);
889 let mode_iteration = self.mode_iteration;
890 self.state = MachineState::Finished;
891 self.pending_effects.push_back(Effect::Done {
892 messages: msgs,
893 events,
894 mode_iteration,
895 });
896 }
897
898 pub fn poll_effect(&mut self) -> Option<Effect<M>> {
901 if let Some(effect) = self.pending_effects.pop_front() {
902 return Some(effect);
903 }
904 if self.active_effect_redelivery {
905 self.active_effect_redelivery = false;
906 if let Some(effect) = self.state.outstanding_effect() {
907 return Some(effect);
908 }
909 }
910
911 match &self.state {
912 MachineState::PreparingMode => {
913 self.prepare_mode();
914 self.pending_effects.pop_front()
915 }
916 MachineState::PrepareIteration => {
917 self.prepare_mode_iteration();
918 self.pending_effects.pop_front()
919 }
920 _ => None,
921 }
922 }
923
924 fn prepare_mode(&mut self) {
927 if self.config.sync_execution_surface {
928 let id = self.next_id();
929 self.state = MachineState::WaitingExecutionSurface {
930 effect_id: id,
931 update_machine_config: false,
932 };
933 self.pending_effects
934 .push_back(Effect::SyncExecutionSurface {
935 id,
936 update_machine_config: false,
937 });
938 return;
939 }
940
941 self.prepare_mode_iteration();
942 }
943
944 fn prepare_mode_iteration(&mut self) {
945 if self.config.sync_execution_surface
946 && self.synced_mode_iteration != Some(self.mode_iteration)
947 {
948 let id = self.next_id();
949 self.state = MachineState::WaitingExecutionSurface {
950 effect_id: id,
951 update_machine_config: true,
952 };
953 self.pending_effects
954 .push_back(Effect::SyncExecutionSurface {
955 id,
956 update_machine_config: true,
957 });
958 return;
959 }
960 let actions = {
961 let driver = Arc::clone(&self.config.protocol_driver);
962 let ctx = self.driver_context();
963 driver.prepare_mode_iteration(ctx)
964 };
965 self.apply_actions(actions);
966 }
967
968 fn start_llm_request(
969 &mut self,
970 request: Arc<LlmRequest>,
971 driver_state: Option<M::DriverState>,
972 ) {
973 let tool_list = self
974 .config
975 .tool_specs
976 .iter()
977 .map(|tool| tool.name.as_str())
978 .collect::<Vec<_>>()
979 .join(", ");
980 self.emit(SessionEvent::LlmRequest {
981 mode_iteration: self.mode_iteration,
982 message_count: self.messages.len(),
983 tool_list,
984 });
985
986 let id = self.next_id();
987 self.state = MachineState::WaitingLlm {
988 effect_id: id,
989 request: Arc::clone(&request),
990 driver_state,
991 };
992 self.pending_effects
993 .push_back(Effect::LlmCall { id, request });
994 }
995
996 fn start_tool_calls(&mut self, calls: Vec<PendingToolCall>) {
997 let effect_id = self.next_id();
998 self.state = MachineState::WaitingTools {
999 effect_id,
1000 calls: calls.clone(),
1001 };
1002 self.pending_effects.push_back(Effect::ToolCalls {
1003 id: effect_id,
1004 calls,
1005 });
1006 }
1007
1008 fn start_exec(&mut self, code: String, driver_state: M::DriverState) {
1009 let effect_id = self.next_id();
1010 self.state = MachineState::WaitingExec {
1011 effect_id,
1012 code: code.clone(),
1013 driver_state,
1014 };
1015 self.pending_effects.push_back(Effect::ExecCode {
1016 id: effect_id,
1017 code,
1018 });
1019 }
1020
1021 fn append_event(&mut self, event: SessionEventRecord<M::Event>) {
1022 match event {
1023 SessionEventRecord::Conversation(record) => {
1024 Arc::make_mut(&mut self.events)
1025 .push(SessionEventRecord::Conversation(record.clone()));
1026 self.messages.push(record.to_message());
1027 }
1028 SessionEventRecord::Tool(ToolEvent::Invocation { stable_key, record }) => {
1029 Arc::make_mut(&mut self.events).push(SessionEventRecord::Tool(
1030 ToolEvent::Invocation { stable_key, record },
1031 ));
1032 }
1033 SessionEventRecord::Mode(mode_event) => {
1034 Arc::make_mut(&mut self.events).push(SessionEventRecord::Mode(mode_event));
1035 }
1036 SessionEventRecord::StateSnapshot(snapshot) => {
1037 Arc::make_mut(&mut self.events).push(SessionEventRecord::StateSnapshot(snapshot));
1038 }
1039 }
1040 }
1041
1042 pub fn apply_actions(&mut self, actions: Vec<DriverAction<M>>) {
1043 let mut progress_dirty = false;
1044 for action in actions {
1045 match action {
1046 DriverAction::Emit(event) => self.emit(event),
1047 DriverAction::AppendEvents(events) => {
1048 if !events.is_empty() {
1049 for event in events {
1050 self.append_event(event);
1051 }
1052 progress_dirty = true;
1053 }
1054 }
1055 DriverAction::StartLlm {
1056 request,
1057 driver_state,
1058 } => self.start_llm_request(request, driver_state),
1059 DriverAction::StartTools { calls } => self.start_tool_calls(calls),
1060 DriverAction::StartExec { code, driver_state } => {
1061 self.start_exec(code, driver_state)
1062 }
1063 DriverAction::StartCheckpoint {
1064 checkpoint,
1065 on_empty,
1066 } => self.request_checkpoint(checkpoint, on_empty),
1067 DriverAction::AdvanceModeIteration => {
1068 self.mode_iteration += 1;
1069 self.synced_mode_iteration = None;
1070 progress_dirty = true;
1071 }
1072 DriverAction::ScheduleTurnLimitFinal => {
1073 self.termination.maybe_schedule_turn_limit_final(
1074 self.mode_iteration,
1075 self.mode_run_offset,
1076 self.config.max_turns,
1077 self.messages.make_mut(),
1078 );
1079 progress_dirty = true;
1080 }
1081 DriverAction::Finish(outcome) => {
1082 if progress_dirty {
1083 self.emit_progress();
1084 progress_dirty = false;
1085 }
1086 self.finish(outcome);
1087 break;
1088 }
1089 }
1090 }
1091 if progress_dirty {
1092 self.emit_progress();
1093 }
1094 }
1095
1096 pub fn handle_response(&mut self, response: Response) {
1098 self.active_effect_redelivery = false;
1099 match response {
1100 Response::ExecutionSurfaceSynced { id, result } => {
1101 self.handle_execution_surface_synced(id, result)
1102 }
1103 Response::LlmComplete {
1104 id,
1105 result,
1106 text_streamed,
1107 } => self.handle_llm_complete(id, result, text_streamed),
1108 Response::ToolResults { id, results } => self.handle_tool_results(id, results),
1109 Response::ExecResult { id, result } => self.handle_exec_result(id, result),
1110 Response::Checkpoint {
1111 id,
1112 messages,
1113 transient_messages,
1114 } => self.handle_checkpoint(id, messages, transient_messages),
1115 Response::Timeout { id } => self.handle_timeout(id),
1116 }
1117 }
1118
1119 fn request_checkpoint(&mut self, checkpoint: CheckpointKind, on_empty: CheckpointResumeAction) {
1120 let id = self.next_id();
1121 self.state = MachineState::WaitingCheckpoint {
1122 effect_id: id,
1123 checkpoint,
1124 on_empty,
1125 };
1126 self.pending_effects
1127 .push_back(Effect::Checkpoint { id, checkpoint });
1128 }
1129
1130 fn handle_execution_surface_synced(
1131 &mut self,
1132 id: EffectId,
1133 result: Result<Option<ExecutionSurfaceSync>, String>,
1134 ) {
1135 let (waiting_id, waiting_update_machine_config) =
1136 match std::mem::replace(&mut self.state, MachineState::Finished) {
1137 MachineState::WaitingExecutionSurface {
1138 effect_id,
1139 update_machine_config,
1140 } => (effect_id, update_machine_config),
1141 other => {
1142 self.state = other;
1143 return;
1144 }
1145 };
1146 if waiting_id != id {
1147 self.state = MachineState::WaitingExecutionSurface {
1148 effect_id: waiting_id,
1149 update_machine_config: waiting_update_machine_config,
1150 };
1151 return;
1152 }
1153
1154 match result {
1155 Ok(update) => {
1156 if let Some(update) = update {
1157 self.config.system_prompt = update.system_prompt;
1158 self.config.tool_specs = update.tool_specs;
1159 }
1160 self.synced_mode_iteration = Some(self.mode_iteration);
1161 self.state = MachineState::PrepareIteration;
1162 }
1163 Err(error) => {
1164 self.fail_turn(make_error_event(
1165 "execution_surface",
1166 Some("reconfigure_failed"),
1167 format!("Failed to refresh execution surface: {error}"),
1168 Some(error),
1169 ));
1170 }
1171 }
1172 }
1173
1174 fn append_checkpoint_messages(&mut self, plugin_messages: &[PluginMessage], transient: bool) {
1175 let appended = plugin_messages
1176 .iter()
1177 .filter(|message| matches!(message.role, MessageRole::User | MessageRole::System))
1178 .map(|message| {
1179 let message_id = fresh_message_id();
1180 let mut parts = if message.parts.is_empty() {
1181 vec![Part {
1182 id: format!("{message_id}.p0"),
1183 kind: PartKind::Text,
1184 content: message.content.clone(),
1185 attachment: None,
1186 tool_call_id: None,
1187 tool_name: None,
1188 tool_replay: None,
1189 prune_state: PruneState::Intact,
1190 reasoning_meta: None,
1191 response_meta: None,
1192 }]
1193 } else {
1194 message.parts.clone()
1195 };
1196 reassign_part_ids(&message_id, &mut parts);
1197 Message {
1198 id: message_id.clone(),
1199 role: message.role,
1200 parts: Arc::new(parts),
1201 origin: Some(MessageOrigin::Plugin {
1202 plugin_id: "plugin".to_string(),
1203 transient,
1204 }),
1205 }
1206 })
1207 .collect::<Vec<_>>();
1208 if !appended.is_empty() {
1209 self.messages.extend(appended);
1210 }
1211 }
1212
1213 fn handle_checkpoint(
1214 &mut self,
1215 id: EffectId,
1216 messages: Vec<PluginMessage>,
1217 transient_messages: Vec<PluginMessage>,
1218 ) {
1219 let (effect_id, checkpoint, on_empty) =
1220 match std::mem::replace(&mut self.state, MachineState::Finished) {
1221 MachineState::WaitingCheckpoint {
1222 effect_id,
1223 checkpoint,
1224 on_empty,
1225 } => (effect_id, checkpoint, on_empty),
1226 other => {
1227 self.state = other;
1228 return;
1229 }
1230 };
1231 if effect_id != id {
1232 self.state = MachineState::WaitingCheckpoint {
1233 effect_id,
1234 checkpoint,
1235 on_empty,
1236 };
1237 return;
1238 }
1239
1240 if !messages.is_empty() || !transient_messages.is_empty() {
1241 self.append_checkpoint_messages(&messages, false);
1242 self.append_checkpoint_messages(&transient_messages, true);
1243 if matches!(checkpoint, CheckpointKind::BeforeCompletion) {
1244 self.mode_iteration += 1;
1245 if self.termination.should_force_exit_after_grace_turn() {
1246 self.emit_progress();
1247 self.finish(TurnOutcome::Stopped(TurnStop::MaxTurns));
1248 return;
1249 }
1250 self.termination.maybe_schedule_turn_limit_final(
1251 self.mode_iteration,
1252 self.mode_run_offset,
1253 self.config.max_turns,
1254 self.messages.make_mut(),
1255 );
1256 }
1257 self.state = MachineState::PrepareIteration;
1258 self.emit_progress();
1259 return;
1260 }
1261
1262 match on_empty {
1263 CheckpointResumeAction::PrepareIteration => {
1264 self.state = MachineState::PrepareIteration;
1265 }
1266 CheckpointResumeAction::Finish(outcome) => self.finish(outcome),
1267 }
1268 }
1269
1270 fn take_waiting_llm_state(&mut self, id: EffectId) -> Option<WaitingLlmState<M>> {
1271 match std::mem::replace(&mut self.state, MachineState::Finished) {
1272 MachineState::WaitingLlm {
1273 effect_id,
1274 request,
1275 driver_state,
1276 } if effect_id == id => Some(WaitingLlmState {
1277 request,
1278 driver_state,
1279 }),
1280 other => {
1281 self.state = other;
1282 None
1283 }
1284 }
1285 }
1286
1287 fn handle_llm_complete(
1288 &mut self,
1289 id: EffectId,
1290 result: Result<LlmResponse, LlmCallError>,
1291 text_streamed: bool,
1292 ) {
1293 let Some(waiting) = self.take_waiting_llm_state(id) else {
1294 return;
1295 };
1296 match result {
1297 Err(error) => {
1298 self.emit_llm_error(error);
1299 }
1300 Ok(llm_response) => {
1301 self.record_llm_usage(&llm_response, self.llm_response_text(&llm_response));
1302 if self.handle_terminal_llm_response(&llm_response, text_streamed) {
1303 return;
1304 }
1305 let actions = {
1306 let driver = Arc::clone(&self.config.protocol_driver);
1307 let ctx = self.driver_context();
1308 driver.handle_llm_success(ctx, waiting, llm_response, text_streamed)
1309 };
1310 self.apply_actions(actions);
1311 }
1312 }
1313 }
1314
1315 fn handle_terminal_llm_response(
1316 &mut self,
1317 llm_response: &LlmResponse,
1318 text_streamed: bool,
1319 ) -> bool {
1320 let outcome = match llm_response.terminal_reason {
1321 LlmTerminalReason::OutputLimit => TurnOutcome::Stopped(TurnStop::Incomplete),
1322 LlmTerminalReason::ContextOverflow => TurnOutcome::Stopped(TurnStop::ProviderError),
1323 LlmTerminalReason::ContentFilter => TurnOutcome::Stopped(TurnStop::ProviderError),
1324 LlmTerminalReason::ProviderError => TurnOutcome::Stopped(TurnStop::ProviderError),
1325 LlmTerminalReason::Cancelled => TurnOutcome::Stopped(TurnStop::Cancelled),
1326 LlmTerminalReason::Stop | LlmTerminalReason::ToolUse | LlmTerminalReason::Unknown => {
1327 return false;
1328 }
1329 };
1330
1331 if !text_streamed && !llm_response.full_text.is_empty() {
1332 self.emit(SessionEvent::TextDelta {
1333 content: llm_response.full_text.clone(),
1334 });
1335 }
1336 self.emit(SessionEvent::LlmResponse {
1337 mode_iteration: self.mode_iteration,
1338 content: llm_response.full_text.clone(),
1339 duration_ms: 0,
1340 });
1341 let reason = llm_response.terminal_reason;
1342 let diagnostic = llm_response
1343 .terminal_diagnostic
1344 .clone()
1345 .unwrap_or_else(|| format!("Model call ended with terminal reason {reason:?}."));
1346 self.emit(SessionEvent::Error {
1347 message: diagnostic.clone(),
1348 envelope: Some(crate::session_model::make_error_envelope(
1349 "llm_provider",
1350 Some(reason.code()),
1351 Some(reason),
1352 diagnostic,
1353 None,
1354 )),
1355 });
1356 self.finish(outcome);
1357 true
1358 }
1359
1360 fn llm_response_text<'a>(&self, llm_response: &'a LlmResponse) -> &'a str {
1361 &llm_response.full_text
1362 }
1363
1364 fn llm_response_debug_parts(&self, llm_response: &LlmResponse) -> Option<Value> {
1365 let parts = llm_response
1366 .parts
1367 .iter()
1368 .filter_map(|part| match part {
1369 LlmOutputPart::Text { text, .. } if !text.is_empty() => Some(serde_json::json!({
1370 "type": "text",
1371 "text": text,
1372 })),
1373 LlmOutputPart::Text { .. } => None,
1374 LlmOutputPart::Reasoning {
1375 text,
1376 replay,
1377 } => Some(serde_json::json!({
1378 "type": "reasoning",
1379 "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1380 "summary": replay.as_ref().map(|meta| &meta.summary),
1381 "text": text,
1382 "has_encrypted": replay.as_ref().is_some_and(|meta| meta.encrypted_content.is_some() || meta.signature.is_some()),
1383 "redacted": replay.as_ref().is_some_and(|meta| meta.redacted),
1384 })),
1385 LlmOutputPart::ToolCall {
1386 call_id,
1387 tool_name,
1388 input_json,
1389 replay,
1390 } => Some(serde_json::json!({
1391 "type": "tool_call",
1392 "call_id": call_id,
1393 "tool_name": tool_name,
1394 "input_json": input_json,
1395 "id": replay.as_ref().and_then(|meta| meta.item_id.as_ref()),
1396 "has_opaque": replay.as_ref().is_some_and(|meta| meta.opaque.is_some()),
1397 })),
1398 })
1399 .collect::<Vec<_>>();
1400 (!parts.is_empty()).then_some(Value::Array(parts))
1401 }
1402
1403 fn record_llm_usage(&mut self, llm_response: &LlmResponse, response_text: &str) {
1404 let usage = token_usage_from_llm_usage(&llm_response.usage);
1405 self.cumulative_usage.add(&usage);
1406 self.emit(SessionEvent::TokenUsage {
1407 mode_iteration: self.mode_iteration,
1408 usage: usage.clone(),
1409 cumulative: self.cumulative_usage.clone(),
1410 });
1411 if self.config.emit_llm_trace {
1412 let response_parts = self.llm_response_debug_parts(llm_response);
1413 self.pending_effects.push_back(Effect::Log {
1414 event: LogEvent::LlmDebug {
1415 session_id: self.config.session_id.clone(),
1416 mode_iteration: self.mode_iteration,
1417 usage,
1418 provider_usage: llm_response.provider_usage.clone(),
1419 request_body: llm_response.request_body.clone(),
1420 response_text: response_text.to_string(),
1421 response_parts,
1422 },
1423 });
1424 }
1425 }
1426
1427 fn record_llm_error(&mut self, error: &LlmCallError) {
1428 if self.config.emit_llm_trace {
1429 self.pending_effects.push_back(Effect::Log {
1430 event: LogEvent::LlmError {
1431 session_id: self.config.session_id.clone(),
1432 mode_iteration: self.mode_iteration,
1433 request_body: error.request_body.clone(),
1434 message: error.message.clone(),
1435 retryable: error.retryable,
1436 raw: error.raw.clone(),
1437 code: error.code.clone(),
1438 terminal_reason: error.terminal_reason,
1439 },
1440 });
1441 }
1442 }
1443
1444 fn emit_llm_error(&mut self, error: LlmCallError) {
1445 self.record_llm_error(&error);
1446 self.emit(SessionEvent::Error {
1447 message: format!("LLM error: {}", error.message),
1448 envelope: Some(crate::session_model::make_error_envelope(
1449 "llm_provider",
1450 error.code.as_deref(),
1451 Some(error.terminal_reason),
1452 format!("LLM error: {}", error.message),
1453 error.raw,
1454 )),
1455 });
1456 self.finish(TurnOutcome::Stopped(TurnStop::ProviderError));
1457 }
1458
1459 fn handle_tool_results(&mut self, id: EffectId, completed: Vec<CompletedToolCall>) {
1460 let (waiting_effect_id, waiting_calls) =
1461 match std::mem::replace(&mut self.state, MachineState::Finished) {
1462 MachineState::WaitingTools { effect_id, calls } => (effect_id, calls),
1463 other => {
1464 self.state = other;
1465 return;
1466 }
1467 };
1468
1469 if waiting_effect_id != id {
1470 self.state = MachineState::WaitingTools {
1471 effect_id: waiting_effect_id,
1472 calls: waiting_calls,
1473 };
1474 return;
1475 }
1476
1477 for outcome in &completed {
1478 self.emit(SessionEvent::ToolCall {
1479 call_id: Some(outcome.call_id.clone()),
1480 name: outcome.tool_name.clone(),
1481 args: outcome.args.clone(),
1482 output: outcome.output.clone(),
1483 duration_ms: outcome.duration_ms,
1484 });
1485 }
1486
1487 let actions = {
1488 let driver = Arc::clone(&self.config.protocol_driver);
1489 let ctx = self.driver_context();
1490 driver.handle_tool_results(ctx, completed)
1491 };
1492 self.apply_actions(actions);
1493 }
1494
1495 fn take_waiting_exec_state(&mut self, id: EffectId) -> Option<WaitingExecState<M>> {
1496 match std::mem::replace(&mut self.state, MachineState::Finished) {
1497 MachineState::WaitingExec {
1498 effect_id,
1499 code: _,
1500 driver_state,
1501 } if effect_id == id => Some(WaitingExecState { driver_state }),
1502 other => {
1503 self.state = other;
1504 None
1505 }
1506 }
1507 }
1508
1509 fn handle_exec_result(&mut self, id: EffectId, result: Result<crate::ExecResponse, String>) {
1510 let Some(waiting) = self.take_waiting_exec_state(id) else {
1511 return;
1512 };
1513 let actions = {
1514 let driver = Arc::clone(&self.config.protocol_driver);
1515 let ctx = self.driver_context();
1516 driver.handle_exec_result(ctx, waiting, result)
1517 };
1518 self.apply_actions(actions);
1519 }
1520
1521 fn handle_timeout(&mut self, _id: EffectId) {}
1522}
1523
1524fn token_usage_from_llm_usage(usage: &crate::llm::types::LlmUsage) -> TokenUsage {
1525 TokenUsage {
1526 input_tokens: usage.input_tokens,
1527 output_tokens: usage.output_tokens,
1528 cached_input_tokens: usage.cached_input_tokens,
1529 reasoning_tokens: usage.reasoning_tokens,
1530 }
1531}
1532
1533#[cfg(test)]
1534mod tests;