Skip to main content

ralph_workflow/app/
event_loop.rs

1//! Event loop for reducer-based pipeline architecture.
2//!
3//! This module implements main event loop that coordinates reducer,
4//! effect handlers, and orchestration logic. It provides a unified way to
5//! run the pipeline using the event-sourced architecture from RFC-004.
6
7use crate::phases::PhaseContext;
8use crate::reducer::{
9    determine_next_effect, reduce, CheckpointTrigger, EffectHandler, MainEffectHandler,
10    PipelineEvent, PipelineState,
11};
12use anyhow::Result;
13
14/// Maximum iterations for the main event loop to prevent infinite loops.
15///
16/// This is a safety limit - the pipeline should complete well before this limit
17/// under normal circumstances. If reached, it indicates either a bug in the
18/// reducer logic or an extremely complex project.
19pub const MAX_EVENT_LOOP_ITERATIONS: usize = 1000;
20
21/// Configuration for event loop.
22#[derive(Clone, Debug)]
23pub struct EventLoopConfig {
24    /// Maximum number of iterations to prevent infinite loops.
25    pub max_iterations: usize,
26    /// Whether to enable checkpointing during the event loop.
27    pub enable_checkpointing: bool,
28}
29
30/// Result of event loop execution.
31#[derive(Debug, Clone)]
32pub struct EventLoopResult {
33    /// Whether pipeline completed successfully.
34    pub completed: bool,
35    /// Total events processed.
36    pub events_processed: usize,
37}
38
39/// Run the main event loop for the reducer-based pipeline.
40///
41/// This function orchestrates pipeline execution by repeatedly:
42/// 1. Determining the next effect based on the current state
43/// 2. Executing the effect through the effect handler (which performs side effects)
44/// 3. Applying the resulting event to state through the reducer (pure function)
45/// 4. Repeating until a terminal state is reached or max iterations exceeded
46///
47/// The entire event loop is wrapped in panic recovery to ensure the pipeline
48/// never crashes due to agent failures (including segmentation faults).
49///
50/// # Arguments
51///
52/// * `ctx` - Phase context for effect handlers
53/// * `initial_state` - Optional initial state (if None, creates a new state)
54/// * `config` - Event loop configuration
55///
56/// # Returns
57///
58/// Returns the event loop result containing the completion status and final state.
59pub fn run_event_loop(
60    ctx: &mut PhaseContext<'_>,
61    initial_state: Option<PipelineState>,
62    config: EventLoopConfig,
63) -> Result<EventLoopResult> {
64    let loop_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
65        run_event_loop_internal(ctx, initial_state.clone(), config)
66    }));
67
68    match loop_result {
69        Ok(result) => result,
70        Err(_) => {
71            ctx.logger.error("Event loop recovered from panic");
72            let _fallback_state = initial_state.unwrap_or_else(|| {
73                PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
74            });
75
76            Ok(EventLoopResult {
77                completed: false,
78                events_processed: 0,
79            })
80        }
81    }
82}
83
84/// Run the event loop with a custom effect handler.
85///
86/// This variant allows injecting a custom effect handler for testing.
87/// The handler must implement `EffectHandler` and `StatefulHandler` traits.
88///
89/// # Arguments
90///
91/// * `ctx` - Phase context for effect handlers
92/// * `initial_state` - Optional initial state (if None, creates a new state)
93/// * `config` - Event loop configuration
94/// * `handler` - Custom effect handler (e.g., MockEffectHandler for testing)
95///
96/// # Returns
97///
98/// Returns the event loop result containing the completion status and final state.
99pub fn run_event_loop_with_handler<'ctx, H>(
100    ctx: &mut PhaseContext<'_>,
101    initial_state: Option<PipelineState>,
102    config: EventLoopConfig,
103    handler: &mut H,
104) -> Result<EventLoopResult>
105where
106    H: EffectHandler<'ctx> + StatefulHandler,
107{
108    let mut state = initial_state.unwrap_or_else(|| {
109        PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
110    });
111
112    handler.update_state(state.clone());
113    let mut events_processed = 0;
114
115    ctx.logger.info("Starting reducer-based event loop");
116
117    while !state.is_complete() && events_processed < config.max_iterations {
118        let effect = determine_next_effect(&state);
119
120        // Execute returns EffectResult with both PipelineEvent and UIEvents
121        let result = handler.execute(effect, ctx)?;
122
123        // Display UI events (does not affect state)
124        for ui_event in &result.ui_events {
125            ctx.logger.info(&ui_event.format_for_display());
126        }
127
128        // Apply pipeline event to state (reducer remains pure)
129        let new_state = reduce(state, result.event.clone());
130
131        handler.update_state(new_state.clone());
132        state = new_state;
133
134        events_processed += 1;
135
136        if config.enable_checkpointing {
137            let checkpoint_event = PipelineEvent::CheckpointSaved {
138                trigger: CheckpointTrigger::PhaseTransition,
139            };
140            state = reduce(state, checkpoint_event);
141        }
142    }
143
144    if events_processed >= config.max_iterations {
145        ctx.logger.warn(&format!(
146            "Event loop reached max iterations ({}) without completion",
147            config.max_iterations
148        ));
149    }
150
151    Ok(EventLoopResult {
152        completed: state.is_complete(),
153        events_processed,
154    })
155}
156
157/// Trait for handlers that maintain internal state.
158///
159/// This trait allows the event loop to update the handler's internal state
160/// after each event is processed.
161pub trait StatefulHandler {
162    /// Update the handler's internal state.
163    fn update_state(&mut self, state: PipelineState);
164}
165
166fn run_event_loop_internal(
167    ctx: &mut PhaseContext<'_>,
168    initial_state: Option<PipelineState>,
169    config: EventLoopConfig,
170) -> Result<EventLoopResult> {
171    let mut state = initial_state.unwrap_or_else(|| {
172        PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
173    });
174
175    let mut handler = MainEffectHandler::new(state.clone());
176    let mut events_processed = 0;
177
178    ctx.logger.info("Starting reducer-based event loop");
179
180    while !state.is_complete() && events_processed < config.max_iterations {
181        let effect = determine_next_effect(&state);
182
183        // Execute returns EffectResult with both PipelineEvent and UIEvents
184        let result = handler.execute(effect, ctx)?;
185
186        // Display UI events (does not affect state)
187        for ui_event in &result.ui_events {
188            ctx.logger.info(&ui_event.format_for_display());
189        }
190
191        // Apply pipeline event to state (reducer remains pure)
192        let new_state = reduce(state, result.event.clone());
193
194        handler.state = new_state.clone();
195        state = new_state;
196
197        events_processed += 1;
198
199        if config.enable_checkpointing {
200            let checkpoint_event = PipelineEvent::CheckpointSaved {
201                trigger: CheckpointTrigger::PhaseTransition,
202            };
203            state = reduce(state, checkpoint_event);
204        }
205    }
206
207    if events_processed >= config.max_iterations {
208        ctx.logger.warn(&format!(
209            "Event loop reached max iterations ({}) without completion",
210            config.max_iterations
211        ));
212    }
213
214    Ok(EventLoopResult {
215        completed: state.is_complete(),
216        events_processed,
217    })
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn test_event_loop_config_creation() {
226        let config = EventLoopConfig {
227            max_iterations: 1000,
228            enable_checkpointing: true,
229        };
230        assert_eq!(config.max_iterations, 1000);
231        assert!(config.enable_checkpointing);
232    }
233
234    /// TDD test: run_event_loop_with_handler should accept a generic EffectHandler
235    /// allowing MockEffectHandler to be injected for testing.
236    #[cfg(feature = "test-utils")]
237    #[test]
238    fn test_run_event_loop_with_mock_handler() {
239        use crate::agents::AgentRegistry;
240        use crate::checkpoint::{ExecutionHistory, RunContext};
241        use crate::config::Config;
242        use crate::executor::MockProcessExecutor;
243        use crate::logger::{Colors, Logger};
244        use crate::phases::PhaseContext;
245        use crate::pipeline::{Stats, Timer};
246        use crate::prompts::template_context::TemplateContext;
247        use crate::reducer::mock_effect_handler::MockEffectHandler;
248        use crate::reducer::PipelineState;
249        use crate::workspace::MemoryWorkspace;
250        use std::path::PathBuf;
251        use std::sync::Arc;
252
253        // Create test fixtures
254        let config = Config::default();
255        let colors = Colors { enabled: false };
256        let logger = Logger::new(colors);
257        let mut timer = Timer::new();
258        let mut stats = Stats::default();
259        let template_context = TemplateContext::default();
260        let registry = AgentRegistry::new().unwrap();
261        let executor = Arc::new(MockProcessExecutor::new());
262        let repo_root = PathBuf::from("/test/repo");
263        let workspace = MemoryWorkspace::new(repo_root.clone());
264
265        // Create PhaseContext
266        let mut ctx = PhaseContext {
267            config: &config,
268            registry: &registry,
269            logger: &logger,
270            colors: &colors,
271            timer: &mut timer,
272            stats: &mut stats,
273            developer_agent: "test-developer",
274            reviewer_agent: "test-reviewer",
275            review_guidelines: None,
276            template_context: &template_context,
277            run_context: RunContext::new(),
278            execution_history: ExecutionHistory::new(),
279            prompt_history: std::collections::HashMap::new(),
280            executor: &*executor,
281            executor_arc: Arc::clone(&executor) as Arc<dyn crate::executor::ProcessExecutor>,
282            repo_root: &repo_root,
283            workspace: &workspace,
284        };
285
286        // Create mock handler
287        let state = PipelineState::initial(1, 0);
288        let mut handler = MockEffectHandler::new(state.clone());
289
290        let loop_config = EventLoopConfig {
291            max_iterations: 100,
292            enable_checkpointing: false,
293        };
294
295        // This should compile and run with the mock handler
296        let result = run_event_loop_with_handler(&mut ctx, Some(state), loop_config, &mut handler);
297
298        assert!(result.is_ok(), "Event loop should complete successfully");
299
300        // Mock handler should have captured effects
301        assert!(
302            handler.effect_count() > 0,
303            "Mock handler should have captured at least one effect"
304        );
305    }
306}