Skip to main content

ralph_workflow/app/event_loop/
config.rs

1//! Event loop configuration and initialization.
2//!
3//! This module defines configuration types and initialization logic for the
4//! reducer-based event loop.
5
6use crate::phases::PhaseContext;
7use crate::reducer::event::PipelinePhase;
8use crate::reducer::state::ContinuationState;
9use crate::reducer::PipelineState;
10
11/// Create initial pipeline state with continuation limits from config.
12///
13/// This function creates a `PipelineState` with XSD retry and continuation limits
14/// loaded from the config, ensuring these values are available for the reducer
15/// to make deterministic retry decisions.
16pub fn create_initial_state_with_config(ctx: &PhaseContext<'_>) -> PipelineState {
17    // Config semantics: max_dev_continuations counts continuation attempts *beyond*
18    // the initial attempt. ContinuationState::max_continue_count semantics are
19    // "maximum total attempts including initial".
20
21    // CRITICAL: max_dev_continuations should always be Some() when loaded via config_from_unified().
22    // The serde defaults in UnifiedConfig ensure these fields are never missing.
23    // The unwrap_or() here is a defensive fallback for edge cases:
24    // - Config::default() or Config::test_default()
25    // - Direct Config construction in tests without going through config_from_unified()
26    //
27    // In debug builds, we assert that the value is Some() to catch config loading bugs early.
28    debug_assert!(
29        ctx.config.max_dev_continuations.is_some(),
30        "BUG: max_dev_continuations is None when it should always have a value from config loading. \
31         This indicates config_from_unified() did not properly set the field, or Config was \
32         constructed directly without defaults."
33    );
34    debug_assert!(
35        ctx.config.max_xsd_retries.is_some(),
36        "BUG: max_xsd_retries is None when it should always have a value from config loading."
37    );
38    debug_assert!(
39        ctx.config.max_same_agent_retries.is_some(),
40        "BUG: max_same_agent_retries is None when it should always have a value from config loading."
41    );
42
43    // CRITICAL SAFETY MECHANISM: Apply unconditional default of 2 (3 total attempts) when None.
44    // This ensures bounded continuation even if Config was constructed without going through
45    // config_from_unified() (e.g., Config::default(), tests). This is the PRIMARY DEFENSE
46    // against infinite continuation loops when max_dev_continuations is missing.
47    //
48    // VERIFIED FIX: This unwrap_or(2) is what prevents the infinite loop bug reported by user.
49    // With max_dev_continuations = 2:
50    // - max_continue_count = 1 + 2 = 3
51    // - Attempts 0, 1, 2 are allowed (3 total)
52    // - Attempt 3+ is exhausted via OutcomeApplied check: (attempt + 1 >= 3)
53    //
54    // The defensive check in trigger_continuation provides additional safety by preventing
55    // counter increment when next_attempt >= max_continue_count.
56    let max_dev_continuations = ctx.config.max_dev_continuations.unwrap_or(2);
57    let max_continue_count = 1 + max_dev_continuations;
58
59    // SAFETY ASSERTION: when max_dev_continuations is absent, unwrap_or(2)
60    // must produce the default total-attempts cap of 3.
61    if ctx.config.max_dev_continuations.is_none() {
62        debug_assert_eq!(
63            max_continue_count, 3,
64            "BUG: missing max_dev_continuations must default to 3 total attempts. Got: {max_continue_count}"
65        );
66    }
67
68    let continuation = ContinuationState::with_limits(
69        ctx.config.max_xsd_retries.unwrap_or(10),
70        max_continue_count,
71        ctx.config.max_same_agent_retries.unwrap_or(2),
72    );
73    let mut state = PipelineState::initial_with_continuation(
74        ctx.config.developer_iters,
75        ctx.config.reviewer_reviews,
76        &continuation,
77    );
78
79    // Inject a checkpoint-safe (redacted) view of runtime cloud config.
80    // This ensures pure orchestration can derive cloud effects when enabled,
81    // without ever storing secrets in reducer state.
82    state.cloud = crate::config::CloudStateConfig::from(ctx.cloud);
83
84    state
85}
86
87/// Overlay checkpoint-derived progress onto a config-derived base state.
88///
89/// This is used for resume: budgets/limits remain config-driven (from `base_state`),
90/// while progress counters and histories are restored from the checkpoint-migrated
91/// `PipelineState`.
92///
93/// NOTE: `base_state.cloud` is intentionally preserved (it is derived from
94/// runtime env and is already redacted/credential-free).
95pub fn overlay_checkpoint_progress_onto_base_state(
96    base_state: &mut PipelineState,
97    migrated: PipelineState,
98    execution_history_limit: usize,
99) {
100    let migrated_execution_history = migrated.execution_history().clone();
101
102    base_state.phase = migrated.phase;
103    base_state.iteration = migrated.iteration;
104    base_state.total_iterations = migrated.total_iterations;
105    base_state.reviewer_pass = migrated.reviewer_pass;
106    base_state.total_reviewer_passes = migrated.total_reviewer_passes;
107    base_state.rebase = migrated.rebase;
108    base_state
109        .replace_execution_history_bounded(migrated_execution_history, execution_history_limit);
110    base_state.prompt_inputs = migrated.prompt_inputs;
111    base_state.prompt_permissions = migrated.prompt_permissions;
112    base_state.prompt_history = migrated.prompt_history;
113    base_state.metrics = migrated.metrics;
114
115    // Restore resume-critical recovery/dev-fix/interrupt fields.
116    // These are checkpoint-derived and must not be dropped, otherwise resumed runs
117    // can behave as if recovery/dev-fix state never happened.
118    base_state.recovery_epoch = migrated.recovery_epoch;
119    base_state.recovery_escalation_level = migrated.recovery_escalation_level;
120    base_state.dev_fix_attempt_count = migrated.dev_fix_attempt_count;
121    base_state.failed_phase_for_recovery = migrated.failed_phase_for_recovery;
122    base_state.interrupted_by_user = migrated.interrupted_by_user;
123
124    // Restore cloud resume continuity from checkpoint-migrated state.
125    // Keep `base_state.cloud` (runtime env-derived, redacted).
126    base_state.pending_push_commit = migrated.pending_push_commit;
127    base_state.git_auth_configured = migrated.git_auth_configured;
128    base_state.pr_created = migrated.pr_created;
129    base_state.pr_url = migrated.pr_url;
130    base_state.pr_number = migrated.pr_number;
131    base_state.push_count = migrated.push_count;
132    base_state.push_retry_count = migrated.push_retry_count;
133    base_state.last_push_error = migrated.last_push_error;
134    base_state.unpushed_commits = migrated.unpushed_commits;
135    base_state.last_pushed_commit = migrated.last_pushed_commit;
136}
137
138/// Maximum iterations for the main event loop to prevent infinite loops.
139///
140/// This is a safety limit - the pipeline should complete well before this limit
141/// under normal circumstances. If reached, it indicates either a bug in the
142/// reducer logic or an extremely complex project.
143///
144/// NOTE: Even `1_000_000` can still be too low for extremely slow-progress runs.
145/// If this cap is hit in practice, prefer making it configurable and/or
146/// investigating why the reducer is not converging.
147pub const MAX_EVENT_LOOP_ITERATIONS: usize = 1_000_000;
148
149#[cfg(test)]
150mod resume_overlay_tests {
151    use super::overlay_checkpoint_progress_onto_base_state;
152    use crate::config::{CloudStateConfig, GitAuthStateMethod, GitRemoteStateConfig};
153    use crate::reducer::event::PipelinePhase;
154    use crate::reducer::PipelineState;
155
156    #[test]
157    fn resume_overlay_restores_cloud_resume_fields_but_preserves_runtime_cloud() {
158        let mut base = PipelineState::initial(3, 2);
159        base.cloud = CloudStateConfig {
160            enabled: true,
161            api_url: None,
162            run_id: Some("run_from_env".to_string()),
163            heartbeat_interval_secs: 30,
164            graceful_degradation: true,
165            git_remote: GitRemoteStateConfig {
166                auth_method: GitAuthStateMethod::Token {
167                    username: "x-access-token".to_string(),
168                },
169                push_branch: "env_branch".to_string(),
170                create_pr: true,
171                pr_title_template: None,
172                pr_body_template: None,
173                pr_base_branch: None,
174                force_push: false,
175                remote_name: "origin".to_string(),
176            },
177        };
178
179        let mut migrated = PipelineState::initial(999, 999);
180        migrated.cloud = CloudStateConfig::disabled();
181        migrated.pending_push_commit = Some("abc123".to_string());
182        migrated.git_auth_configured = true;
183        migrated.pr_created = true;
184        migrated.pr_url = Some("https://example.com/pr/1".to_string());
185        migrated.pr_number = Some(1);
186        migrated.push_count = 7;
187        migrated.push_retry_count = 2;
188        migrated.last_push_error = Some("push failed".to_string());
189        migrated.unpushed_commits = vec!["deadbeef".to_string()];
190        migrated.last_pushed_commit = Some("beadfeed".to_string());
191
192        overlay_checkpoint_progress_onto_base_state(&mut base, migrated, 1000);
193
194        // Runtime (env-derived) redacted config is preserved.
195        assert!(base.cloud.enabled);
196        assert_eq!(base.cloud.run_id.as_deref(), Some("run_from_env"));
197        assert_eq!(base.cloud.git_remote.push_branch.as_str(), "env_branch");
198
199        // Cloud resume state is restored.
200        assert_eq!(base.pending_push_commit.as_deref(), Some("abc123"));
201        assert!(base.git_auth_configured);
202        assert!(base.pr_created);
203        assert_eq!(base.pr_url.as_deref(), Some("https://example.com/pr/1"));
204        assert_eq!(base.pr_number, Some(1));
205        assert_eq!(base.push_count, 7);
206        assert_eq!(base.push_retry_count, 2);
207        assert_eq!(base.last_push_error.as_deref(), Some("push failed"));
208        assert_eq!(base.unpushed_commits, vec!["deadbeef".to_string()]);
209        assert_eq!(base.last_pushed_commit.as_deref(), Some("beadfeed"));
210    }
211
212    #[test]
213    fn resume_overlay_restores_recovery_and_interrupt_fields() {
214        let mut base = PipelineState::initial(3, 2);
215
216        let mut migrated = PipelineState::initial(999, 999);
217        migrated.dev_fix_attempt_count = 42;
218        migrated.recovery_epoch = 7;
219        migrated.recovery_escalation_level = 3;
220        migrated.failed_phase_for_recovery = Some(PipelinePhase::Review);
221        migrated.interrupted_by_user = true;
222
223        overlay_checkpoint_progress_onto_base_state(&mut base, migrated, 1000);
224
225        assert_eq!(base.dev_fix_attempt_count, 42);
226        assert_eq!(base.recovery_epoch, 7);
227        assert_eq!(base.recovery_escalation_level, 3);
228        assert_eq!(base.failed_phase_for_recovery, Some(PipelinePhase::Review));
229        assert!(
230            base.interrupted_by_user,
231            "interrupted_by_user must be restored from the migrated checkpoint state"
232        );
233    }
234}
235
236/// Configuration for event loop.
237#[derive(Copy, Clone, Debug)]
238pub struct EventLoopConfig {
239    /// Maximum number of iterations to prevent infinite loops.
240    pub max_iterations: usize,
241}
242
243/// Result of event loop execution.
244#[derive(Debug, Clone)]
245pub struct EventLoopResult {
246    /// Whether pipeline completed successfully.
247    pub completed: bool,
248    /// Total events processed.
249    pub events_processed: usize,
250    /// Final reducer phase when the loop stopped.
251    pub final_phase: PipelinePhase,
252    /// Final pipeline state (for metrics and summary).
253    pub final_state: PipelineState,
254}