ralph_workflow/app/
event_loop.rs1use crate::phases::PhaseContext;
8use crate::reducer::{
9 determine_next_effect, reduce, CheckpointTrigger, EffectHandler, MainEffectHandler,
10 PipelineEvent, PipelineState,
11};
12use anyhow::Result;
13
14pub const MAX_EVENT_LOOP_ITERATIONS: usize = 1000;
20
21#[derive(Clone, Debug)]
23pub struct EventLoopConfig {
24 pub max_iterations: usize,
26 pub enable_checkpointing: bool,
28}
29
30#[derive(Debug, Clone)]
32pub struct EventLoopResult {
33 pub completed: bool,
35 pub events_processed: usize,
37}
38
39pub fn run_event_loop(
60 ctx: &mut PhaseContext<'_>,
61 initial_state: Option<PipelineState>,
62 config: EventLoopConfig,
63) -> Result<EventLoopResult> {
64 let loop_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
65 run_event_loop_internal(ctx, initial_state.clone(), config)
66 }));
67
68 match loop_result {
69 Ok(result) => result,
70 Err(_) => {
71 ctx.logger.error("Event loop recovered from panic");
72 let _fallback_state = initial_state.unwrap_or_else(|| {
73 PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
74 });
75
76 Ok(EventLoopResult {
77 completed: false,
78 events_processed: 0,
79 })
80 }
81 }
82}
83
84pub fn run_event_loop_with_handler<'ctx, H>(
100 ctx: &mut PhaseContext<'_>,
101 initial_state: Option<PipelineState>,
102 config: EventLoopConfig,
103 handler: &mut H,
104) -> Result<EventLoopResult>
105where
106 H: EffectHandler<'ctx> + StatefulHandler,
107{
108 let mut state = initial_state.unwrap_or_else(|| {
109 PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
110 });
111
112 handler.update_state(state.clone());
113 let mut events_processed = 0;
114
115 ctx.logger.info("Starting reducer-based event loop");
116
117 while !state.is_complete() && events_processed < config.max_iterations {
118 let effect = determine_next_effect(&state);
119
120 let result = handler.execute(effect, ctx)?;
122
123 for ui_event in &result.ui_events {
125 ctx.logger.info(&ui_event.format_for_display());
126 }
127
128 let new_state = reduce(state, result.event.clone());
130
131 handler.update_state(new_state.clone());
132 state = new_state;
133
134 events_processed += 1;
135
136 if config.enable_checkpointing {
137 let checkpoint_event = PipelineEvent::CheckpointSaved {
138 trigger: CheckpointTrigger::PhaseTransition,
139 };
140 state = reduce(state, checkpoint_event);
141 }
142 }
143
144 if events_processed >= config.max_iterations {
145 ctx.logger.warn(&format!(
146 "Event loop reached max iterations ({}) without completion",
147 config.max_iterations
148 ));
149 }
150
151 Ok(EventLoopResult {
152 completed: state.is_complete(),
153 events_processed,
154 })
155}
156
157pub trait StatefulHandler {
162 fn update_state(&mut self, state: PipelineState);
164}
165
166fn run_event_loop_internal(
167 ctx: &mut PhaseContext<'_>,
168 initial_state: Option<PipelineState>,
169 config: EventLoopConfig,
170) -> Result<EventLoopResult> {
171 let mut state = initial_state.unwrap_or_else(|| {
172 PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
173 });
174
175 let mut handler = MainEffectHandler::new(state.clone());
176 let mut events_processed = 0;
177
178 ctx.logger.info("Starting reducer-based event loop");
179
180 while !state.is_complete() && events_processed < config.max_iterations {
181 let effect = determine_next_effect(&state);
182
183 let result = handler.execute(effect, ctx)?;
185
186 for ui_event in &result.ui_events {
188 ctx.logger.info(&ui_event.format_for_display());
189 }
190
191 let new_state = reduce(state, result.event.clone());
193
194 handler.state = new_state.clone();
195 state = new_state;
196
197 events_processed += 1;
198
199 if config.enable_checkpointing {
200 let checkpoint_event = PipelineEvent::CheckpointSaved {
201 trigger: CheckpointTrigger::PhaseTransition,
202 };
203 state = reduce(state, checkpoint_event);
204 }
205 }
206
207 if events_processed >= config.max_iterations {
208 ctx.logger.warn(&format!(
209 "Event loop reached max iterations ({}) without completion",
210 config.max_iterations
211 ));
212 }
213
214 Ok(EventLoopResult {
215 completed: state.is_complete(),
216 events_processed,
217 })
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn test_event_loop_config_creation() {
226 let config = EventLoopConfig {
227 max_iterations: 1000,
228 enable_checkpointing: true,
229 };
230 assert_eq!(config.max_iterations, 1000);
231 assert!(config.enable_checkpointing);
232 }
233
234 #[cfg(feature = "test-utils")]
237 #[test]
238 fn test_run_event_loop_with_mock_handler() {
239 use crate::agents::AgentRegistry;
240 use crate::checkpoint::{ExecutionHistory, RunContext};
241 use crate::config::Config;
242 use crate::executor::MockProcessExecutor;
243 use crate::logger::{Colors, Logger};
244 use crate::phases::PhaseContext;
245 use crate::pipeline::{Stats, Timer};
246 use crate::prompts::template_context::TemplateContext;
247 use crate::reducer::mock_effect_handler::MockEffectHandler;
248 use crate::reducer::PipelineState;
249 use crate::workspace::MemoryWorkspace;
250 use std::path::PathBuf;
251 use std::sync::Arc;
252
253 let config = Config::default();
255 let colors = Colors { enabled: false };
256 let logger = Logger::new(colors);
257 let mut timer = Timer::new();
258 let mut stats = Stats::default();
259 let template_context = TemplateContext::default();
260 let registry = AgentRegistry::new().unwrap();
261 let executor = Arc::new(MockProcessExecutor::new());
262 let repo_root = PathBuf::from("/test/repo");
263 let workspace = MemoryWorkspace::new(repo_root.clone());
264
265 let mut ctx = PhaseContext {
267 config: &config,
268 registry: ®istry,
269 logger: &logger,
270 colors: &colors,
271 timer: &mut timer,
272 stats: &mut stats,
273 developer_agent: "test-developer",
274 reviewer_agent: "test-reviewer",
275 review_guidelines: None,
276 template_context: &template_context,
277 run_context: RunContext::new(),
278 execution_history: ExecutionHistory::new(),
279 prompt_history: std::collections::HashMap::new(),
280 executor: &*executor,
281 executor_arc: Arc::clone(&executor) as Arc<dyn crate::executor::ProcessExecutor>,
282 repo_root: &repo_root,
283 workspace: &workspace,
284 };
285
286 let state = PipelineState::initial(1, 0);
288 let mut handler = MockEffectHandler::new(state.clone());
289
290 let loop_config = EventLoopConfig {
291 max_iterations: 100,
292 enable_checkpointing: false,
293 };
294
295 let result = run_event_loop_with_handler(&mut ctx, Some(state), loop_config, &mut handler);
297
298 assert!(result.is_ok(), "Event loop should complete successfully");
299
300 assert!(
302 handler.effect_count() > 0,
303 "Mock handler should have captured at least one effect"
304 );
305 }
306}