ralph_workflow/app/event_loop/
config.rs1use crate::phases::PhaseContext;
7use crate::reducer::event::PipelinePhase;
8use crate::reducer::state::ContinuationState;
9use crate::reducer::PipelineState;
10
11pub fn create_initial_state_with_config(ctx: &PhaseContext<'_>) -> PipelineState {
17 debug_assert!(
29 ctx.config.max_dev_continuations.is_some(),
30 "BUG: max_dev_continuations is None when it should always have a value from config loading. \
31 This indicates config_from_unified() did not properly set the field, or Config was \
32 constructed directly without defaults."
33 );
34 debug_assert!(
35 ctx.config.max_xsd_retries.is_some(),
36 "BUG: max_xsd_retries is None when it should always have a value from config loading."
37 );
38 debug_assert!(
39 ctx.config.max_same_agent_retries.is_some(),
40 "BUG: max_same_agent_retries is None when it should always have a value from config loading."
41 );
42
43 let max_dev_continuations = ctx.config.max_dev_continuations.unwrap_or(2);
57 let max_continue_count = 1 + max_dev_continuations;
58
59 if ctx.config.max_dev_continuations.is_none() {
62 debug_assert_eq!(
63 max_continue_count, 3,
64 "BUG: missing max_dev_continuations must default to 3 total attempts. Got: {max_continue_count}"
65 );
66 }
67
68 let continuation = ContinuationState::with_limits(
69 ctx.config.max_xsd_retries.unwrap_or(10),
70 max_continue_count,
71 ctx.config.max_same_agent_retries.unwrap_or(2),
72 );
73 let mut state = PipelineState::initial_with_continuation(
74 ctx.config.developer_iters,
75 ctx.config.reviewer_reviews,
76 &continuation,
77 );
78
79 state.cloud = crate::config::CloudStateConfig::from(ctx.cloud);
83
84 state
85}
86
87pub fn overlay_checkpoint_progress_onto_base_state(
96 base_state: &mut PipelineState,
97 migrated: PipelineState,
98 execution_history_limit: usize,
99) {
100 let migrated_execution_history = migrated.execution_history().clone();
101
102 base_state.phase = migrated.phase;
103 base_state.iteration = migrated.iteration;
104 base_state.total_iterations = migrated.total_iterations;
105 base_state.reviewer_pass = migrated.reviewer_pass;
106 base_state.total_reviewer_passes = migrated.total_reviewer_passes;
107 base_state.rebase = migrated.rebase;
108 base_state
109 .replace_execution_history_bounded(migrated_execution_history, execution_history_limit);
110 base_state.prompt_inputs = migrated.prompt_inputs;
111 base_state.prompt_permissions = migrated.prompt_permissions;
112 base_state.prompt_history = migrated.prompt_history;
113 base_state.metrics = migrated.metrics;
114
115 base_state.recovery_epoch = migrated.recovery_epoch;
119 base_state.recovery_escalation_level = migrated.recovery_escalation_level;
120 base_state.dev_fix_attempt_count = migrated.dev_fix_attempt_count;
121 base_state.failed_phase_for_recovery = migrated.failed_phase_for_recovery;
122 base_state.interrupted_by_user = migrated.interrupted_by_user;
123
124 base_state.pending_push_commit = migrated.pending_push_commit;
127 base_state.git_auth_configured = migrated.git_auth_configured;
128 base_state.pr_created = migrated.pr_created;
129 base_state.pr_url = migrated.pr_url;
130 base_state.pr_number = migrated.pr_number;
131 base_state.push_count = migrated.push_count;
132 base_state.push_retry_count = migrated.push_retry_count;
133 base_state.last_push_error = migrated.last_push_error;
134 base_state.unpushed_commits = migrated.unpushed_commits;
135 base_state.last_pushed_commit = migrated.last_pushed_commit;
136}
137
138pub const MAX_EVENT_LOOP_ITERATIONS: usize = 1_000_000;
148
149#[cfg(test)]
150mod resume_overlay_tests {
151 use super::overlay_checkpoint_progress_onto_base_state;
152 use crate::config::{CloudStateConfig, GitAuthStateMethod, GitRemoteStateConfig};
153 use crate::reducer::event::PipelinePhase;
154 use crate::reducer::PipelineState;
155
156 #[test]
157 fn resume_overlay_restores_cloud_resume_fields_but_preserves_runtime_cloud() {
158 let mut base = PipelineState::initial(3, 2);
159 base.cloud = CloudStateConfig {
160 enabled: true,
161 api_url: None,
162 run_id: Some("run_from_env".to_string()),
163 heartbeat_interval_secs: 30,
164 graceful_degradation: true,
165 git_remote: GitRemoteStateConfig {
166 auth_method: GitAuthStateMethod::Token {
167 username: "x-access-token".to_string(),
168 },
169 push_branch: "env_branch".to_string(),
170 create_pr: true,
171 pr_title_template: None,
172 pr_body_template: None,
173 pr_base_branch: None,
174 force_push: false,
175 remote_name: "origin".to_string(),
176 },
177 };
178
179 let mut migrated = PipelineState::initial(999, 999);
180 migrated.cloud = CloudStateConfig::disabled();
181 migrated.pending_push_commit = Some("abc123".to_string());
182 migrated.git_auth_configured = true;
183 migrated.pr_created = true;
184 migrated.pr_url = Some("https://example.com/pr/1".to_string());
185 migrated.pr_number = Some(1);
186 migrated.push_count = 7;
187 migrated.push_retry_count = 2;
188 migrated.last_push_error = Some("push failed".to_string());
189 migrated.unpushed_commits = vec!["deadbeef".to_string()];
190 migrated.last_pushed_commit = Some("beadfeed".to_string());
191
192 overlay_checkpoint_progress_onto_base_state(&mut base, migrated, 1000);
193
194 assert!(base.cloud.enabled);
196 assert_eq!(base.cloud.run_id.as_deref(), Some("run_from_env"));
197 assert_eq!(base.cloud.git_remote.push_branch.as_str(), "env_branch");
198
199 assert_eq!(base.pending_push_commit.as_deref(), Some("abc123"));
201 assert!(base.git_auth_configured);
202 assert!(base.pr_created);
203 assert_eq!(base.pr_url.as_deref(), Some("https://example.com/pr/1"));
204 assert_eq!(base.pr_number, Some(1));
205 assert_eq!(base.push_count, 7);
206 assert_eq!(base.push_retry_count, 2);
207 assert_eq!(base.last_push_error.as_deref(), Some("push failed"));
208 assert_eq!(base.unpushed_commits, vec!["deadbeef".to_string()]);
209 assert_eq!(base.last_pushed_commit.as_deref(), Some("beadfeed"));
210 }
211
212 #[test]
213 fn resume_overlay_restores_recovery_and_interrupt_fields() {
214 let mut base = PipelineState::initial(3, 2);
215
216 let mut migrated = PipelineState::initial(999, 999);
217 migrated.dev_fix_attempt_count = 42;
218 migrated.recovery_epoch = 7;
219 migrated.recovery_escalation_level = 3;
220 migrated.failed_phase_for_recovery = Some(PipelinePhase::Review);
221 migrated.interrupted_by_user = true;
222
223 overlay_checkpoint_progress_onto_base_state(&mut base, migrated, 1000);
224
225 assert_eq!(base.dev_fix_attempt_count, 42);
226 assert_eq!(base.recovery_epoch, 7);
227 assert_eq!(base.recovery_escalation_level, 3);
228 assert_eq!(base.failed_phase_for_recovery, Some(PipelinePhase::Review));
229 assert!(
230 base.interrupted_by_user,
231 "interrupted_by_user must be restored from the migrated checkpoint state"
232 );
233 }
234}
235
236#[derive(Copy, Clone, Debug)]
238pub struct EventLoopConfig {
239 pub max_iterations: usize,
241}
242
243#[derive(Debug, Clone)]
245pub struct EventLoopResult {
246 pub completed: bool,
248 pub events_processed: usize,
250 pub final_phase: PipelinePhase,
252 pub final_state: PipelineState,
254}