Skip to main content

ralph_workflow/app/
event_loop.rs

1//! Event loop for reducer-based pipeline architecture.
2//!
3//! This module implements main event loop that coordinates reducer,
4//! effect handlers, and orchestration logic. It provides a unified way to
5//! run the pipeline using the event-sourced architecture from RFC-004.
6
7use crate::phases::PhaseContext;
8use crate::reducer::{
9    determine_next_effect, reduce, CheckpointTrigger, EffectHandler, MainEffectHandler,
10    PipelineEvent, PipelineState,
11};
12use anyhow::Result;
13
14/// Configuration for event loop.
15#[derive(Clone, Debug)]
16pub struct EventLoopConfig {
17    /// Maximum number of iterations to prevent infinite loops.
18    pub max_iterations: usize,
19    /// Whether to enable checkpointing during the event loop.
20    pub enable_checkpointing: bool,
21}
22
23/// Result of event loop execution.
24#[derive(Debug, Clone)]
25pub struct EventLoopResult {
26    /// Whether pipeline completed successfully.
27    pub completed: bool,
28    /// Total events processed.
29    pub events_processed: usize,
30}
31
32/// Run the main event loop for the reducer-based pipeline.
33///
34/// This function orchestrates pipeline execution by repeatedly:
35/// 1. Determining the next effect based on the current state
36/// 2. Executing the effect through the effect handler (which performs side effects)
37/// 3. Applying the resulting event to state through the reducer (pure function)
38/// 4. Repeating until a terminal state is reached or max iterations exceeded
39///
40/// The entire event loop is wrapped in panic recovery to ensure the pipeline
41/// never crashes due to agent failures (including segmentation faults).
42///
43/// # Arguments
44///
45/// * `ctx` - Phase context for effect handlers
46/// * `initial_state` - Optional initial state (if None, creates a new state)
47/// * `config` - Event loop configuration
48///
49/// # Returns
50///
51/// Returns the event loop result containing the completion status and final state.
52pub fn run_event_loop(
53    ctx: &mut PhaseContext<'_>,
54    initial_state: Option<PipelineState>,
55    config: EventLoopConfig,
56) -> Result<EventLoopResult> {
57    let loop_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
58        run_event_loop_internal(ctx, initial_state.clone(), config)
59    }));
60
61    match loop_result {
62        Ok(result) => result,
63        Err(_) => {
64            ctx.logger.error("Event loop recovered from panic");
65            let _fallback_state = initial_state.unwrap_or_else(|| {
66                PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
67            });
68
69            Ok(EventLoopResult {
70                completed: false,
71                events_processed: 0,
72            })
73        }
74    }
75}
76
77/// Run the event loop with a custom effect handler.
78///
79/// This variant allows injecting a custom effect handler for testing.
80/// The handler must implement `EffectHandler` and `StatefulHandler` traits.
81///
82/// # Arguments
83///
84/// * `ctx` - Phase context for effect handlers
85/// * `initial_state` - Optional initial state (if None, creates a new state)
86/// * `config` - Event loop configuration
87/// * `handler` - Custom effect handler (e.g., MockEffectHandler for testing)
88///
89/// # Returns
90///
91/// Returns the event loop result containing the completion status and final state.
92pub fn run_event_loop_with_handler<'ctx, H>(
93    ctx: &mut PhaseContext<'_>,
94    initial_state: Option<PipelineState>,
95    config: EventLoopConfig,
96    handler: &mut H,
97) -> Result<EventLoopResult>
98where
99    H: EffectHandler<'ctx> + StatefulHandler,
100{
101    let mut state = initial_state.unwrap_or_else(|| {
102        PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
103    });
104
105    handler.update_state(state.clone());
106    let mut events_processed = 0;
107
108    ctx.logger.info("Starting reducer-based event loop");
109
110    while !state.is_complete() && events_processed < config.max_iterations {
111        let effect = determine_next_effect(&state);
112
113        let event = handler.execute(effect, ctx)?;
114
115        let new_state = reduce(state, event.clone());
116
117        handler.update_state(new_state.clone());
118        state = new_state;
119
120        events_processed += 1;
121
122        if config.enable_checkpointing {
123            let checkpoint_event = PipelineEvent::CheckpointSaved {
124                trigger: CheckpointTrigger::PhaseTransition,
125            };
126            state = reduce(state, checkpoint_event);
127        }
128    }
129
130    if events_processed >= config.max_iterations {
131        ctx.logger.warn(&format!(
132            "Event loop reached max iterations ({}) without completion",
133            config.max_iterations
134        ));
135    }
136
137    Ok(EventLoopResult {
138        completed: state.is_complete(),
139        events_processed,
140    })
141}
142
143/// Trait for handlers that maintain internal state.
144///
145/// This trait allows the event loop to update the handler's internal state
146/// after each event is processed.
147pub trait StatefulHandler {
148    /// Update the handler's internal state.
149    fn update_state(&mut self, state: PipelineState);
150}
151
152fn run_event_loop_internal(
153    ctx: &mut PhaseContext<'_>,
154    initial_state: Option<PipelineState>,
155    config: EventLoopConfig,
156) -> Result<EventLoopResult> {
157    let mut state = initial_state.unwrap_or_else(|| {
158        PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
159    });
160
161    let mut handler = MainEffectHandler::new(state.clone());
162    let mut events_processed = 0;
163
164    ctx.logger.info("Starting reducer-based event loop");
165
166    while !state.is_complete() && events_processed < config.max_iterations {
167        let effect = determine_next_effect(&state);
168
169        let event = handler.execute(effect, ctx)?;
170
171        let new_state = reduce(state, event.clone());
172
173        handler.state = new_state.clone();
174        state = new_state;
175
176        events_processed += 1;
177
178        if config.enable_checkpointing {
179            let checkpoint_event = PipelineEvent::CheckpointSaved {
180                trigger: CheckpointTrigger::PhaseTransition,
181            };
182            state = reduce(state, checkpoint_event);
183        }
184    }
185
186    if events_processed >= config.max_iterations {
187        ctx.logger.warn(&format!(
188            "Event loop reached max iterations ({}) without completion",
189            config.max_iterations
190        ));
191    }
192
193    Ok(EventLoopResult {
194        completed: state.is_complete(),
195        events_processed,
196    })
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn test_event_loop_config_creation() {
205        let config = EventLoopConfig {
206            max_iterations: 1000,
207            enable_checkpointing: true,
208        };
209        assert_eq!(config.max_iterations, 1000);
210        assert!(config.enable_checkpointing);
211    }
212
213    /// TDD test: run_event_loop_with_handler should accept a generic EffectHandler
214    /// allowing MockEffectHandler to be injected for testing.
215    #[cfg(feature = "test-utils")]
216    #[test]
217    fn test_run_event_loop_with_mock_handler() {
218        use crate::agents::AgentRegistry;
219        use crate::checkpoint::{ExecutionHistory, RunContext};
220        use crate::config::Config;
221        use crate::executor::MockProcessExecutor;
222        use crate::logger::{Colors, Logger};
223        use crate::phases::PhaseContext;
224        use crate::pipeline::{Stats, Timer};
225        use crate::prompts::template_context::TemplateContext;
226        use crate::reducer::mock_effect_handler::MockEffectHandler;
227        use crate::reducer::PipelineState;
228        use crate::workspace::MemoryWorkspace;
229        use std::path::PathBuf;
230        use std::sync::Arc;
231
232        // Create test fixtures
233        let config = Config::default();
234        let colors = Colors { enabled: false };
235        let logger = Logger::new(colors);
236        let mut timer = Timer::new();
237        let mut stats = Stats::default();
238        let template_context = TemplateContext::default();
239        let registry = AgentRegistry::new().unwrap();
240        let executor = Arc::new(MockProcessExecutor::new());
241        let repo_root = PathBuf::from("/test/repo");
242        let workspace = MemoryWorkspace::new(repo_root.clone());
243
244        // Create PhaseContext
245        let mut ctx = PhaseContext {
246            config: &config,
247            registry: &registry,
248            logger: &logger,
249            colors: &colors,
250            timer: &mut timer,
251            stats: &mut stats,
252            developer_agent: "test-developer",
253            reviewer_agent: "test-reviewer",
254            review_guidelines: None,
255            template_context: &template_context,
256            run_context: RunContext::new(),
257            execution_history: ExecutionHistory::new(),
258            prompt_history: std::collections::HashMap::new(),
259            executor: &*executor,
260            executor_arc: Arc::clone(&executor) as Arc<dyn crate::executor::ProcessExecutor>,
261            repo_root: &repo_root,
262            workspace: &workspace,
263        };
264
265        // Create mock handler
266        let state = PipelineState::initial(1, 0);
267        let mut handler = MockEffectHandler::new(state.clone());
268
269        let loop_config = EventLoopConfig {
270            max_iterations: 100,
271            enable_checkpointing: false,
272        };
273
274        // This should compile and run with the mock handler
275        let result = run_event_loop_with_handler(&mut ctx, Some(state), loop_config, &mut handler);
276
277        assert!(result.is_ok(), "Event loop should complete successfully");
278
279        // Mock handler should have captured effects
280        assert!(
281            handler.effect_count() > 0,
282            "Mock handler should have captured at least one effect"
283        );
284    }
285}