ralph_workflow/app/
event_loop.rs1use crate::phases::PhaseContext;
8use crate::reducer::{
9 determine_next_effect, reduce, CheckpointTrigger, EffectHandler, MainEffectHandler,
10 PipelineEvent, PipelineState,
11};
12use anyhow::Result;
13
14#[derive(Clone, Debug)]
16pub struct EventLoopConfig {
17 pub max_iterations: usize,
19 pub enable_checkpointing: bool,
21}
22
23#[derive(Debug, Clone)]
25pub struct EventLoopResult {
26 pub completed: bool,
28 pub events_processed: usize,
30}
31
32pub fn run_event_loop(
53 ctx: &mut PhaseContext<'_>,
54 initial_state: Option<PipelineState>,
55 config: EventLoopConfig,
56) -> Result<EventLoopResult> {
57 let loop_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
58 run_event_loop_internal(ctx, initial_state.clone(), config)
59 }));
60
61 match loop_result {
62 Ok(result) => result,
63 Err(_) => {
64 ctx.logger.error("Event loop recovered from panic");
65 let _fallback_state = initial_state.unwrap_or_else(|| {
66 PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
67 });
68
69 Ok(EventLoopResult {
70 completed: false,
71 events_processed: 0,
72 })
73 }
74 }
75}
76
77pub fn run_event_loop_with_handler<'ctx, H>(
93 ctx: &mut PhaseContext<'_>,
94 initial_state: Option<PipelineState>,
95 config: EventLoopConfig,
96 handler: &mut H,
97) -> Result<EventLoopResult>
98where
99 H: EffectHandler<'ctx> + StatefulHandler,
100{
101 let mut state = initial_state.unwrap_or_else(|| {
102 PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
103 });
104
105 handler.update_state(state.clone());
106 let mut events_processed = 0;
107
108 ctx.logger.info("Starting reducer-based event loop");
109
110 while !state.is_complete() && events_processed < config.max_iterations {
111 let effect = determine_next_effect(&state);
112
113 let event = handler.execute(effect, ctx)?;
114
115 let new_state = reduce(state, event.clone());
116
117 handler.update_state(new_state.clone());
118 state = new_state;
119
120 events_processed += 1;
121
122 if config.enable_checkpointing {
123 let checkpoint_event = PipelineEvent::CheckpointSaved {
124 trigger: CheckpointTrigger::PhaseTransition,
125 };
126 state = reduce(state, checkpoint_event);
127 }
128 }
129
130 if events_processed >= config.max_iterations {
131 ctx.logger.warn(&format!(
132 "Event loop reached max iterations ({}) without completion",
133 config.max_iterations
134 ));
135 }
136
137 Ok(EventLoopResult {
138 completed: state.is_complete(),
139 events_processed,
140 })
141}
142
143pub trait StatefulHandler {
148 fn update_state(&mut self, state: PipelineState);
150}
151
152fn run_event_loop_internal(
153 ctx: &mut PhaseContext<'_>,
154 initial_state: Option<PipelineState>,
155 config: EventLoopConfig,
156) -> Result<EventLoopResult> {
157 let mut state = initial_state.unwrap_or_else(|| {
158 PipelineState::initial(ctx.config.developer_iters, ctx.config.reviewer_reviews)
159 });
160
161 let mut handler = MainEffectHandler::new(state.clone());
162 let mut events_processed = 0;
163
164 ctx.logger.info("Starting reducer-based event loop");
165
166 while !state.is_complete() && events_processed < config.max_iterations {
167 let effect = determine_next_effect(&state);
168
169 let event = handler.execute(effect, ctx)?;
170
171 let new_state = reduce(state, event.clone());
172
173 handler.state = new_state.clone();
174 state = new_state;
175
176 events_processed += 1;
177
178 if config.enable_checkpointing {
179 let checkpoint_event = PipelineEvent::CheckpointSaved {
180 trigger: CheckpointTrigger::PhaseTransition,
181 };
182 state = reduce(state, checkpoint_event);
183 }
184 }
185
186 if events_processed >= config.max_iterations {
187 ctx.logger.warn(&format!(
188 "Event loop reached max iterations ({}) without completion",
189 config.max_iterations
190 ));
191 }
192
193 Ok(EventLoopResult {
194 completed: state.is_complete(),
195 events_processed,
196 })
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn test_event_loop_config_creation() {
205 let config = EventLoopConfig {
206 max_iterations: 1000,
207 enable_checkpointing: true,
208 };
209 assert_eq!(config.max_iterations, 1000);
210 assert!(config.enable_checkpointing);
211 }
212
213 #[cfg(feature = "test-utils")]
216 #[test]
217 fn test_run_event_loop_with_mock_handler() {
218 use crate::agents::AgentRegistry;
219 use crate::checkpoint::{ExecutionHistory, RunContext};
220 use crate::config::Config;
221 use crate::executor::MockProcessExecutor;
222 use crate::logger::{Colors, Logger};
223 use crate::phases::PhaseContext;
224 use crate::pipeline::{Stats, Timer};
225 use crate::prompts::template_context::TemplateContext;
226 use crate::reducer::mock_effect_handler::MockEffectHandler;
227 use crate::reducer::PipelineState;
228 use crate::workspace::MemoryWorkspace;
229 use std::path::PathBuf;
230 use std::sync::Arc;
231
232 let config = Config::default();
234 let colors = Colors { enabled: false };
235 let logger = Logger::new(colors);
236 let mut timer = Timer::new();
237 let mut stats = Stats::default();
238 let template_context = TemplateContext::default();
239 let registry = AgentRegistry::new().unwrap();
240 let executor = Arc::new(MockProcessExecutor::new());
241 let repo_root = PathBuf::from("/test/repo");
242 let workspace = MemoryWorkspace::new(repo_root.clone());
243
244 let mut ctx = PhaseContext {
246 config: &config,
247 registry: ®istry,
248 logger: &logger,
249 colors: &colors,
250 timer: &mut timer,
251 stats: &mut stats,
252 developer_agent: "test-developer",
253 reviewer_agent: "test-reviewer",
254 review_guidelines: None,
255 template_context: &template_context,
256 run_context: RunContext::new(),
257 execution_history: ExecutionHistory::new(),
258 prompt_history: std::collections::HashMap::new(),
259 executor: &*executor,
260 executor_arc: Arc::clone(&executor) as Arc<dyn crate::executor::ProcessExecutor>,
261 repo_root: &repo_root,
262 workspace: &workspace,
263 };
264
265 let state = PipelineState::initial(1, 0);
267 let mut handler = MockEffectHandler::new(state.clone());
268
269 let loop_config = EventLoopConfig {
270 max_iterations: 100,
271 enable_checkpointing: false,
272 };
273
274 let result = run_event_loop_with_handler(&mut ctx, Some(state), loop_config, &mut handler);
276
277 assert!(result.is_ok(), "Event loop should complete successfully");
278
279 assert!(
281 handler.effect_count() > 0,
282 "Mock handler should have captured at least one effect"
283 );
284 }
285}