Skip to main content

ralph_workflow/app/
event_loop.rs

1//! Event loop for reducer-based pipeline architecture.
2//!
3//! This module implements main event loop that coordinates reducer,
4//! effect handlers, and orchestration logic. It provides a unified way to
5//! run the pipeline using the event-sourced architecture from RFC-004.
6//!
7//! # Non-Terminating Pipeline Architecture
8//!
9//! The pipeline is designed to be **non-terminating by default** for unattended operation.
10//! It must NEVER exit early due to internal failures, budget exhaustion, or agent errors.
11//!
12//! ## Failure Handling Flow
13//!
14//! 1. Any terminal failure (Status: Failed, budget exhausted, agent chain exhausted)
15//!    transitions to `AwaitingDevFix` phase
16//! 2. `TriggerDevFixFlow` effect writes completion marker to `.agent/tmp/completion_marker`
17//! 3. Dev-fix agent is optionally dispatched for remediation attempt
18//! 4. `CompletionMarkerEmitted` event transitions to `Interrupted` phase
19//! 5. `SaveCheckpoint` effect saves state for resume
20//! 6. Event loop returns `EventLoopResult { completed: true, ... }`
21//!
22//! ## Acceptable Termination Reasons
23//!
24//! The ONLY acceptable reasons for pipeline termination are catastrophic external events:
25//! - Process termination (SIGKILL)
26//! - Machine outage / power loss
27//! - OS kill signal
28//! - Unrecoverable panic in effect handler (caught and logged)
29//!
30//! All internal errors route through the failure handling flow above.
31
32use crate::phases::PhaseContext;
33use crate::reducer::effect::{Effect, EffectResult};
34use crate::reducer::event::{AwaitingDevFixEvent, CheckpointTrigger, PipelineEvent, PipelinePhase};
35use crate::reducer::state::ContinuationState;
36use crate::reducer::{
37    determine_next_effect, reduce, EffectHandler, MainEffectHandler, PipelineState,
38};
39use anyhow::Result;
40use serde::Serialize;
41use std::collections::VecDeque;
42use std::path::Path;
43
44/// Create initial pipeline state with continuation limits from config.
45///
46/// This function creates a `PipelineState` with XSD retry and continuation limits
47/// loaded from the config, ensuring these values are available for the reducer
48/// to make deterministic retry decisions.
49pub(crate) fn create_initial_state_with_config(ctx: &PhaseContext<'_>) -> PipelineState {
50    // Config semantics: max_dev_continuations counts continuation attempts *beyond*
51    // the initial attempt. ContinuationState::max_continue_count semantics are
52    // "maximum total attempts including initial".
53    let max_dev_continuations = ctx.config.max_dev_continuations.unwrap_or(2);
54    let max_continue_count = 1 + max_dev_continuations;
55
56    let continuation = ContinuationState::with_limits(
57        ctx.config.max_xsd_retries.unwrap_or(10),
58        max_continue_count,
59        ctx.config.max_same_agent_retries.unwrap_or(2),
60    );
61    PipelineState::initial_with_continuation(
62        ctx.config.developer_iters,
63        ctx.config.reviewer_reviews,
64        continuation,
65    )
66}
67
68/// Maximum iterations for the main event loop to prevent infinite loops.
69///
70/// This is a safety limit - the pipeline should complete well before this limit
71/// under normal circumstances. If reached, it indicates either a bug in the
72/// reducer logic or an extremely complex project.
73///
74/// NOTE: Even 1_000_000 can still be too low for extremely slow-progress runs.
75/// If this cap is hit in practice, prefer making it configurable and/or
76/// investigating why the reducer is not converging.
77pub const MAX_EVENT_LOOP_ITERATIONS: usize = 1_000_000;
78
79/// Configuration for event loop.
80#[derive(Clone, Debug)]
81pub struct EventLoopConfig {
82    /// Maximum number of iterations to prevent infinite loops.
83    pub max_iterations: usize,
84}
85
86/// Result of event loop execution.
87#[derive(Debug, Clone)]
88pub struct EventLoopResult {
89    /// Whether pipeline completed successfully.
90    pub completed: bool,
91    /// Total events processed.
92    pub events_processed: usize,
93    /// Final reducer phase when the loop stopped.
94    pub final_phase: PipelinePhase,
95    /// Final pipeline state (for metrics and summary).
96    pub final_state: PipelineState,
97}
98
99const EVENT_LOOP_TRACE_PATH: &str = ".agent/tmp/event_loop_trace.jsonl";
100const DEFAULT_EVENT_LOOP_TRACE_CAPACITY: usize = 200;
101
102#[derive(Clone, Serialize, Debug)]
103struct EventTraceEntry {
104    iteration: usize,
105    effect: String,
106    event: String,
107    phase: String,
108    xsd_retry_pending: bool,
109    xsd_retry_count: u32,
110    invalid_output_attempts: u32,
111    agent_index: usize,
112    model_index: usize,
113    retry_cycle: u32,
114}
115
116#[derive(Debug)]
117struct EventTraceBuffer {
118    capacity: usize,
119    entries: VecDeque<EventTraceEntry>,
120}
121
122impl EventTraceBuffer {
123    fn new(capacity: usize) -> Self {
124        Self {
125            capacity: capacity.max(1),
126            entries: VecDeque::new(),
127        }
128    }
129
130    fn push(&mut self, entry: EventTraceEntry) {
131        self.entries.push_back(entry);
132        while self.entries.len() > self.capacity {
133            self.entries.pop_front();
134        }
135    }
136
137    fn entries(&self) -> &VecDeque<EventTraceEntry> {
138        &self.entries
139    }
140}
141
142#[derive(Serialize)]
143struct EventTraceFinalState<'a> {
144    kind: &'static str,
145    reason: &'a str,
146    state: &'a PipelineState,
147}
148
149fn build_trace_entry(
150    iteration: usize,
151    state: &PipelineState,
152    effect: &str,
153    event: &str,
154) -> EventTraceEntry {
155    EventTraceEntry {
156        iteration,
157        effect: effect.to_string(),
158        event: event.to_string(),
159        phase: format!("{:?}", state.phase),
160        xsd_retry_pending: state.continuation.xsd_retry_pending,
161        xsd_retry_count: state.continuation.xsd_retry_count,
162        invalid_output_attempts: state.continuation.invalid_output_attempts,
163        agent_index: state.agent_chain.current_agent_index,
164        model_index: state.agent_chain.current_model_index,
165        retry_cycle: state.agent_chain.retry_cycle,
166    }
167}
168
169/// Extract ErrorEvent from anyhow::Error if present.
170///
171/// # Error Event Processing Architecture
172///
173/// Effect handlers return errors through `Err(ErrorEvent::Variant.into())`. This function
174/// extracts the original `ErrorEvent` so it can be processed through the reducer.
175///
176/// ## Why Downcast?
177///
178/// When an effect handler returns `Err(ErrorEvent::Variant.into())`, the error is wrapped
179/// in an `anyhow::Error`. Since `ErrorEvent` implements `std::error::Error`, anyhow's
180/// blanket `From` implementation preserves the original error type, allowing us to downcast
181/// back to `ErrorEvent` for reducer processing.
182///
183/// ## Processing Flow
184///
185/// 1. Handler returns `Err(ErrorEvent::AgentChainExhausted { ... }.into())`
186/// 2. Event loop catches the error and calls this function
187/// 3. If downcast succeeds, wrap in `PipelineEvent::PromptInput(PromptInputEvent::HandlerError { ... })`
188///    and process through reducer
189/// 4. If downcast fails, return `Err()` to terminate the event loop (truly unrecoverable error)
190///
191/// This architecture allows the reducer to decide recovery strategy based on the specific
192/// error type, rather than terminating immediately on any `Err()`.
193fn extract_error_event(err: &anyhow::Error) -> Option<crate::reducer::event::ErrorEvent> {
194    // Handlers are allowed to wrap typed ErrorEvents with additional context
195    // (e.g. via `anyhow::Context`). Search the full error chain so we still
196    // recover the underlying reducer error event.
197    for cause in err.chain() {
198        if let Some(error_event) = cause.downcast_ref::<crate::reducer::event::ErrorEvent>() {
199            return Some(error_event.clone());
200        }
201    }
202    None
203}
204
205enum GuardedEffectResult {
206    Ok(Box<EffectResult>),
207    Unrecoverable(anyhow::Error),
208    Panic,
209}
210
211fn execute_effect_guarded<'ctx, H>(
212    handler: &mut H,
213    effect: Effect,
214    ctx: &mut PhaseContext<'_>,
215    state: &PipelineState,
216) -> GuardedEffectResult
217where
218    H: EffectHandler<'ctx>,
219{
220    match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
221        handler.execute(effect, ctx)
222    })) {
223        Ok(Ok(result)) => GuardedEffectResult::Ok(Box::new(result)),
224        Ok(Err(err)) => {
225            if let Some(error_event) = extract_error_event(&err) {
226                GuardedEffectResult::Ok(Box::new(crate::reducer::effect::EffectResult::event(
227                    crate::reducer::event::PipelineEvent::PromptInput(
228                        crate::reducer::event::PromptInputEvent::HandlerError {
229                            phase: state.phase,
230                            error: error_event,
231                        },
232                    ),
233                )))
234            } else {
235                GuardedEffectResult::Unrecoverable(err)
236            }
237        }
238        Err(_) => GuardedEffectResult::Panic,
239    }
240}
241
242fn dump_event_loop_trace(
243    ctx: &mut PhaseContext<'_>,
244    trace: &EventTraceBuffer,
245    final_state: &PipelineState,
246    reason: &str,
247) -> bool {
248    let mut out = String::new();
249
250    for entry in trace.entries() {
251        match serde_json::to_string(entry) {
252            Ok(line) => {
253                out.push_str(&line);
254                out.push('\n');
255            }
256            Err(err) => {
257                ctx.logger.error(&format!(
258                    "Failed to serialize event loop trace entry: {err}"
259                ));
260            }
261        }
262    }
263
264    let final_line = match serde_json::to_string(&EventTraceFinalState {
265        kind: "final_state",
266        reason,
267        state: final_state,
268    }) {
269        Ok(line) => line,
270        Err(err) => {
271            ctx.logger.error(&format!(
272                "Failed to serialize event loop final state: {err}"
273            ));
274            // Ensure the file still contains something useful.
275            format!(
276                "{{\"kind\":\"final_state\",\"reason\":{},\"phase\":{}}}",
277                serde_json::to_string(reason).unwrap_or_else(|_| "\"unknown\"".to_string()),
278                serde_json::to_string(&format!("{:?}", final_state.phase))
279                    .unwrap_or_else(|_| "\"unknown\"".to_string())
280            )
281        }
282    };
283    out.push_str(&final_line);
284    out.push('\n');
285
286    // Ensure the trace directory exists. While `Workspace::write` is expected to
287    // create parent directories, we do this explicitly so trace dumping is
288    // resilient even under stricter workspace implementations.
289    if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
290        ctx.logger
291            .error(&format!("Failed to create trace directory: {err}"));
292        return false;
293    }
294
295    match ctx.workspace.write(Path::new(EVENT_LOOP_TRACE_PATH), &out) {
296        Ok(()) => true,
297        Err(err) => {
298            ctx.logger
299                .error(&format!("Failed to write event loop trace: {err}"));
300            false
301        }
302    }
303}
304
305fn write_completion_marker_on_error(ctx: &mut PhaseContext<'_>, err: &anyhow::Error) -> bool {
306    if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
307        ctx.logger.error(&format!(
308            "Failed to create completion marker directory: {err}"
309        ));
310        return false;
311    }
312
313    let marker_path = Path::new(".agent/tmp/completion_marker");
314    let content = format!("failure\nUnrecoverable handler error: {err}");
315    match ctx.workspace.write(marker_path, &content) {
316        Ok(()) => true,
317        Err(err) => {
318            ctx.logger.error(&format!(
319                "Failed to write completion marker for unrecoverable handler error: {err}"
320            ));
321            false
322        }
323    }
324}
325
326fn run_event_loop_with_handler_traced<'ctx, H>(
327    ctx: &mut PhaseContext<'_>,
328    initial_state: Option<PipelineState>,
329    config: EventLoopConfig,
330    handler: &mut H,
331) -> Result<EventLoopResult>
332where
333    H: EffectHandler<'ctx> + StatefulHandler,
334{
335    let mut state = initial_state.unwrap_or_else(|| create_initial_state_with_config(ctx));
336
337    handler.update_state(state.clone());
338    let mut events_processed = 0;
339    let mut trace = EventTraceBuffer::new(DEFAULT_EVENT_LOOP_TRACE_CAPACITY);
340
341    ctx.logger.info("Starting reducer-based event loop");
342
343    while events_processed < config.max_iterations {
344        // Check if we're already in a terminal state before executing any effect.
345        // This handles the case where the initial state is already complete
346        // (e.g., resuming from an Interrupted checkpoint).
347        //
348        // Special case: If we just transitioned to Interrupted from AwaitingDevFix
349        // without a checkpoint, allow one more iteration to execute SaveCheckpoint.
350        //
351        // CRITICAL: If we're in AwaitingDevFix and haven't executed TriggerDevFixFlow yet,
352        // allow at least one iteration to execute it, even if approaching max iterations.
353        // This ensures completion marker is ALWAYS written and dev-fix agent is ALWAYS
354        // dispatched before the event loop can exit.
355        let should_allow_checkpoint_save = matches!(state.phase, PipelinePhase::Interrupted)
356            && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
357            && state.checkpoint_saved_count == 0;
358
359        let is_awaiting_dev_fix_not_triggered =
360            matches!(state.phase, PipelinePhase::AwaitingDevFix) && !state.dev_fix_triggered;
361
362        if state.is_complete()
363            && !should_allow_checkpoint_save
364            && !is_awaiting_dev_fix_not_triggered
365        {
366            ctx.logger.info(&format!(
367                "Event loop: state already complete (phase: {:?}, checkpoint_saved_count: {})",
368                state.phase, state.checkpoint_saved_count
369            ));
370            break;
371        }
372
373        let effect = determine_next_effect(&state);
374        let effect_str = format!("{effect:?}");
375
376        // Execute returns EffectResult with both PipelineEvent and UIEvents.
377        // Catch panics here so we can still dump a best-effort trace.
378        let result = match execute_effect_guarded(handler, effect, ctx, &state) {
379            GuardedEffectResult::Ok(result) => *result,
380            GuardedEffectResult::Unrecoverable(err) => {
381                // Non-terminating-by-default requirement:
382                // Even "unrecoverable" handler errors must route through reducer-visible
383                // remediation (AwaitingDevFix) so TriggerDevFixFlow can write the completion
384                // marker and dispatch dev-fix.
385                let dumped =
386                    dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
387                let marker_written = write_completion_marker_on_error(ctx, &err);
388                if dumped {
389                    ctx.logger.error(&format!(
390                        "Event loop encountered unrecoverable handler error (trace: {EVENT_LOOP_TRACE_PATH})"
391                    ));
392                } else {
393                    ctx.logger
394                        .error("Event loop encountered unrecoverable handler error");
395                }
396                if marker_written {
397                    ctx.logger
398                        .info("Completion marker written for unrecoverable handler error");
399                }
400
401                // Emit a reducer-visible error that transitions us into AwaitingDevFix.
402                // We don't preserve the original error as a typed ErrorEvent; this is a last-resort
403                // path to guarantee remediation and completion marker semantics.
404                let failure_event = PipelineEvent::PromptInput(
405                    crate::reducer::event::PromptInputEvent::HandlerError {
406                        phase: state.phase,
407                        error: crate::reducer::event::ErrorEvent::WorkspaceWriteFailed {
408                            path: "(unrecoverable_handler_error)".to_string(),
409                            kind: crate::reducer::event::WorkspaceIoErrorKind::Other,
410                        },
411                    },
412                );
413
414                let event_str = format!("{:?}", failure_event);
415                let new_state = reduce(state, failure_event);
416                trace.push(build_trace_entry(
417                    events_processed,
418                    &new_state,
419                    &effect_str,
420                    &event_str,
421                ));
422                handler.update_state(new_state.clone());
423                state = new_state;
424                events_processed += 1;
425
426                continue;
427            }
428            GuardedEffectResult::Panic => {
429                // Handler panics are internal failures and must not terminate the run.
430                // Route through AwaitingDevFix so TriggerDevFixFlow writes the completion marker and
431                // dispatches dev-fix.
432                let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
433                if dumped {
434                    ctx.logger.error(&format!(
435                        "Event loop recovered from panic (trace: {EVENT_LOOP_TRACE_PATH})"
436                    ));
437                } else {
438                    ctx.logger.error("Event loop recovered from panic");
439                }
440
441                // Best-effort completion marker for orchestration, even if the dev-fix flow fails.
442                if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
443                    ctx.logger.error(&format!(
444                        "Failed to create completion marker directory: {err}"
445                    ));
446                }
447                let marker_path = Path::new(".agent/tmp/completion_marker");
448                let content = format!(
449                    "failure\nHandler panic in effect execution (phase={:?}, events_processed={})",
450                    state.phase, events_processed
451                );
452                if let Err(err) = ctx.workspace.write(marker_path, &content) {
453                    ctx.logger.error(&format!(
454                        "Failed to write completion marker for handler panic: {err}"
455                    ));
456                }
457
458                let failure_event = PipelineEvent::PromptInput(
459                    crate::reducer::event::PromptInputEvent::HandlerError {
460                        phase: state.phase,
461                        error: crate::reducer::event::ErrorEvent::WorkspaceWriteFailed {
462                            path: "(handler_panic)".to_string(),
463                            kind: crate::reducer::event::WorkspaceIoErrorKind::Other,
464                        },
465                    },
466                );
467
468                let event_str = format!("{:?}", failure_event);
469                let new_state = reduce(state, failure_event);
470                trace.push(build_trace_entry(
471                    events_processed,
472                    &new_state,
473                    &effect_str,
474                    &event_str,
475                ));
476                handler.update_state(new_state.clone());
477                state = new_state;
478                events_processed += 1;
479
480                continue;
481            }
482        };
483
484        // Display UI events (does not affect state)
485        for ui_event in &result.ui_events {
486            ctx.logger
487                .info(&crate::rendering::render_ui_event(ui_event));
488        }
489
490        let event_str = format!("{:?}", result.event);
491
492        // Apply pipeline event to state (reducer remains pure)
493        let new_state = reduce(state, result.event.clone());
494
495        trace.push(build_trace_entry(
496            events_processed,
497            &new_state,
498            &effect_str,
499            &event_str,
500        ));
501        handler.update_state(new_state.clone());
502        state = new_state;
503        events_processed += 1;
504
505        // Apply additional pipeline events in order.
506        for event in result.additional_events {
507            let event_str = format!("{:?}", event);
508            let additional_state = reduce(state, event);
509            trace.push(build_trace_entry(
510                events_processed,
511                &additional_state,
512                &effect_str,
513                &event_str,
514            ));
515            handler.update_state(additional_state.clone());
516            state = additional_state;
517            events_processed += 1;
518        }
519
520        // Update loop detection counters AFTER all events have been processed.
521        // This is critical: additional events can change phase, agent chain, etc.,
522        // and loop detection must consider the final state after all reductions.
523        let current_fingerprint = crate::reducer::compute_effect_fingerprint(&state);
524        state = PipelineState {
525            continuation: state
526                .continuation
527                .update_loop_detection_counters(current_fingerprint),
528            ..state
529        };
530        handler.update_state(state.clone());
531
532        // Check completion AFTER effect execution and state update.
533        // This ensures that transitions to terminal phases (e.g., Interrupted)
534        // have a chance to save their checkpoint before the loop exits.
535        //
536        // Special case: If we just transitioned to Interrupted from AwaitingDevFix
537        // without a checkpoint, allow one more iteration to execute SaveCheckpoint.
538        // This is needed because TriggerDevFixFlow already wrote the completion marker,
539        // making is_complete() return true, but we still want to save the checkpoint
540        // for proper state persistence.
541        //
542        // CRITICAL: If we're in AwaitingDevFix and haven't executed TriggerDevFixFlow yet,
543        // allow at least one iteration to execute it, even if approaching max iterations.
544        let should_allow_checkpoint_save = matches!(state.phase, PipelinePhase::Interrupted)
545            && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
546            && state.checkpoint_saved_count == 0;
547
548        let is_awaiting_dev_fix_not_triggered =
549            matches!(state.phase, PipelinePhase::AwaitingDevFix) && !state.dev_fix_triggered;
550
551        if state.is_complete()
552            && !should_allow_checkpoint_save
553            && !is_awaiting_dev_fix_not_triggered
554        {
555            ctx.logger.info(&format!(
556                "Event loop: state became complete (phase: {:?}, checkpoint_saved_count: {})",
557                state.phase, state.checkpoint_saved_count
558            ));
559
560            // DEFENSIVE: If we're in Interrupted from AwaitingDevFix without a checkpoint,
561            // warn that SaveCheckpoint should execute next
562            if matches!(state.phase, PipelinePhase::Interrupted)
563                && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
564                && state.checkpoint_saved_count == 0
565            {
566                ctx.logger.warn(
567                    "Interrupted phase reached from AwaitingDevFix without checkpoint saved. \
568                     SaveCheckpoint effect should execute on next iteration.",
569                );
570            }
571
572            break;
573        }
574    }
575
576    // Track if we had to force-complete due to max iterations in AwaitingDevFix
577    let mut forced_completion = false;
578
579    let should_force_checkpoint_after_completion =
580        matches!(state.phase, PipelinePhase::Interrupted)
581            && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
582            && state.checkpoint_saved_count == 0;
583
584    if events_processed >= config.max_iterations && should_force_checkpoint_after_completion {
585        ctx.logger.warn(
586            "Max iterations reached after completion marker; forcing SaveCheckpoint execution",
587        );
588
589        let save_effect = Effect::SaveCheckpoint {
590            trigger: CheckpointTrigger::Interrupt,
591        };
592        let save_effect_str = format!("{save_effect:?}");
593        match execute_effect_guarded(handler, save_effect, ctx, &state) {
594            GuardedEffectResult::Ok(result) => {
595                let result = *result;
596                let event_str = format!("{:?}", result.event);
597                state = reduce(state, result.event.clone());
598                trace.push(build_trace_entry(
599                    events_processed,
600                    &state,
601                    &save_effect_str,
602                    &event_str,
603                ));
604                handler.update_state(state.clone());
605                events_processed += 1;
606
607                for event in result.additional_events {
608                    let event_str = format!("{:?}", event);
609                    state = reduce(state, event);
610                    trace.push(build_trace_entry(
611                        events_processed,
612                        &state,
613                        &save_effect_str,
614                        &event_str,
615                    ));
616                    handler.update_state(state.clone());
617                    events_processed += 1;
618                }
619            }
620            GuardedEffectResult::Unrecoverable(err) => {
621                // Non-terminating-by-default: even failures while forcing checkpoint completion
622                // must route through AwaitingDevFix rather than returning Err.
623                let dumped =
624                    dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
625                let marker_written = write_completion_marker_on_error(ctx, &err);
626                if dumped {
627                    ctx.logger.error(&format!(
628                        "Event loop encountered unrecoverable handler error (trace: {EVENT_LOOP_TRACE_PATH})"
629                    ));
630                } else {
631                    ctx.logger
632                        .error("Event loop encountered unrecoverable handler error");
633                }
634                if marker_written {
635                    ctx.logger
636                        .info("Completion marker written for unrecoverable handler error");
637                }
638
639                // We can't safely continue execution here (we are outside the main loop).
640                // State is already terminal (Interrupted from AwaitingDevFix), so return completion
641                // even if SaveCheckpoint fails.
642                return Ok(EventLoopResult {
643                    completed: true,
644                    events_processed,
645                    final_phase: state.phase,
646                    final_state: state.clone(),
647                });
648            }
649            GuardedEffectResult::Panic => {
650                // Panics during forced completion are internal failures; route through AwaitingDevFix.
651                let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
652                if dumped {
653                    ctx.logger.error(&format!(
654                        "Event loop recovered from panic (trace: {EVENT_LOOP_TRACE_PATH})"
655                    ));
656                } else {
657                    ctx.logger.error("Event loop recovered from panic");
658                }
659
660                if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
661                    ctx.logger.error(&format!(
662                        "Failed to create completion marker directory: {err}"
663                    ));
664                }
665                let marker_path = Path::new(".agent/tmp/completion_marker");
666                let content = format!(
667                    "failure\nHandler panic during forced completion (phase={:?}, events_processed={})",
668                    state.phase, events_processed
669                );
670                if let Err(err) = ctx.workspace.write(marker_path, &content) {
671                    ctx.logger.error(&format!(
672                        "Failed to write completion marker for handler panic: {err}"
673                    ));
674                }
675
676                // We can't safely continue execution here (we are outside the main loop).
677                // State is already terminal (Interrupted from AwaitingDevFix), so return completion
678                // even if SaveCheckpoint fails.
679                return Ok(EventLoopResult {
680                    completed: true,
681                    events_processed,
682                    final_phase: state.phase,
683                    final_state: state.clone(),
684                });
685            }
686        }
687    }
688
689    if events_processed >= config.max_iterations && !state.is_complete() {
690        let dumped = dump_event_loop_trace(ctx, &trace, &state, "max_iterations");
691
692        // DEFENSIVE: If max iterations reached in AwaitingDevFix without dev_fix_triggered,
693        // this is a bug (TriggerDevFixFlow should execute first). However, to maintain the
694        // non-terminating pipeline principle, we force completion:
695        // 1. Write completion marker (signals orchestration)
696        // 2. Emit CompletionMarkerEmitted event (transitions to Interrupted)
697        // 3. Execute SaveCheckpoint (makes is_complete() return true)
698        //
699        // This ensures the pipeline NEVER exits early due to internal logic bugs.
700        // Budget exhaustion should transition to commit/finalization, not terminate.
701        if matches!(state.phase, PipelinePhase::AwaitingDevFix) {
702            ctx.logger.error(
703                "BUG: Hit max iterations in AwaitingDevFix phase. \
704                 TriggerDevFixFlow should have executed before reaching this point. \
705                 Applying defensive recovery logic.",
706            );
707            ctx.logger
708                .warn("Max iterations reached in AwaitingDevFix - forcing completion marker");
709
710            // Write completion marker
711            if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
712                ctx.logger.error(&format!(
713                    "Failed to create completion marker directory: {err}"
714                ));
715            }
716            let marker_path = Path::new(".agent/tmp/completion_marker");
717            let content = format!(
718                "failure\nMax iterations reached in AwaitingDevFix phase (events_processed={})",
719                events_processed
720            );
721            match ctx.workspace.write(marker_path, &content) {
722                Ok(()) => {
723                    ctx.logger
724                        .info("Completion marker written for max iterations failure");
725                }
726                Err(err) => {
727                    ctx.logger.error(&format!(
728                        "Failed to write completion marker for max iterations failure: {err}"
729                    ));
730                }
731            }
732
733            let completion_event =
734                PipelineEvent::AwaitingDevFix(AwaitingDevFixEvent::CompletionMarkerEmitted {
735                    is_failure: true,
736                });
737            let completion_event_str = format!("{:?}", completion_event);
738            state = reduce(state, completion_event);
739            trace.push(build_trace_entry(
740                events_processed,
741                &state,
742                "ForcedCompletionMarker",
743                &completion_event_str,
744            ));
745            handler.update_state(state.clone());
746            events_processed += 1;
747
748            let save_effect = Effect::SaveCheckpoint {
749                trigger: CheckpointTrigger::Interrupt,
750            };
751            let save_effect_str = format!("{save_effect:?}");
752            match execute_effect_guarded(handler, save_effect, ctx, &state) {
753                GuardedEffectResult::Ok(result) => {
754                    let result = *result;
755                    let event_str = format!("{:?}", result.event);
756                    state = reduce(state, result.event.clone());
757                    trace.push(build_trace_entry(
758                        events_processed,
759                        &state,
760                        &save_effect_str,
761                        &event_str,
762                    ));
763                    handler.update_state(state.clone());
764                    events_processed += 1;
765
766                    for event in result.additional_events {
767                        let event_str = format!("{:?}", event);
768                        state = reduce(state, event);
769                        trace.push(build_trace_entry(
770                            events_processed,
771                            &state,
772                            &save_effect_str,
773                            &event_str,
774                        ));
775                        handler.update_state(state.clone());
776                        events_processed += 1;
777                    }
778                }
779                GuardedEffectResult::Unrecoverable(err) => {
780                    // Non-terminating-by-default: even errors during forced completion must route
781                    // through AwaitingDevFix instead of returning Err.
782                    let dumped =
783                        dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
784                    let marker_written = write_completion_marker_on_error(ctx, &err);
785                    if dumped {
786                        ctx.logger.error(&format!(
787                            "Event loop encountered unrecoverable handler error (trace: {EVENT_LOOP_TRACE_PATH})"
788                        ));
789                    } else {
790                        ctx.logger
791                            .error("Event loop encountered unrecoverable handler error");
792                    }
793                    if marker_written {
794                        ctx.logger
795                            .info("Completion marker written for unrecoverable handler error");
796                    }
797
798                    // We can't safely continue execution here (we are outside the main loop).
799                    // State is already terminal (Interrupted from AwaitingDevFix).
800                    // However, the run did NOT complete cleanly, so report incomplete while still
801                    // having written a best-effort completion marker above.
802                    return Ok(EventLoopResult {
803                        completed: false,
804                        events_processed,
805                        final_phase: state.phase,
806                        final_state: state.clone(),
807                    });
808                }
809                GuardedEffectResult::Panic => {
810                    // Panics during forced completion are internal failures; route through AwaitingDevFix.
811                    let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
812                    if dumped {
813                        ctx.logger.error(&format!(
814                            "Event loop recovered from panic (trace: {EVENT_LOOP_TRACE_PATH})"
815                        ));
816                    } else {
817                        ctx.logger.error("Event loop recovered from panic");
818                    }
819
820                    if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
821                        ctx.logger.error(&format!(
822                            "Failed to create completion marker directory: {err}"
823                        ));
824                    }
825                    let marker_path = Path::new(".agent/tmp/completion_marker");
826                    let content = format!(
827                        "failure\nHandler panic during forced completion (phase={:?}, events_processed={})",
828                        state.phase, events_processed
829                    );
830                    if let Err(err) = ctx.workspace.write(marker_path, &content) {
831                        ctx.logger.error(&format!(
832                            "Failed to write completion marker for handler panic: {err}"
833                        ));
834                    }
835
836                    // We can't safely continue execution here (we are outside the main loop).
837                    // State is already terminal (Interrupted from AwaitingDevFix).
838                    // However, the run did NOT complete cleanly, so report incomplete while still
839                    // having written a best-effort completion marker above.
840                    return Ok(EventLoopResult {
841                        completed: false,
842                        events_processed,
843                        final_phase: state.phase,
844                        final_state: state.clone(),
845                    });
846                }
847            }
848
849            forced_completion = true;
850
851            ctx.logger
852                .info("Forced transition to Interrupted phase to satisfy termination requirements");
853        }
854
855        if dumped {
856            ctx.logger.warn(&format!(
857                "Event loop reached max iterations ({}) without completion (trace: {EVENT_LOOP_TRACE_PATH})",
858                config.max_iterations
859            ));
860        } else {
861            ctx.logger.warn(&format!(
862                "Event loop reached max iterations ({}) without completion",
863                config.max_iterations
864            ));
865        }
866
867        if !forced_completion {
868            ctx.logger.error(&format!(
869                "Event loop exiting: reason=max_iterations, phase={:?}, checkpoint_saved_count={}, events_processed={}",
870                state.phase, state.checkpoint_saved_count, events_processed
871            ));
872        }
873    }
874
875    let completed = state.is_complete();
876    if !completed {
877        ctx.logger.warn(&format!(
878            "Event loop exiting without completion: phase={:?}, checkpoint_saved_count={}, \
879             previous_phase={:?}, events_processed={}",
880            state.phase, state.checkpoint_saved_count, state.previous_phase, events_processed
881        ));
882        ctx.logger.info(&format!(
883            "Final state: agent_chain.retry_cycle={}, agent_chain.current_role={:?}",
884            state.agent_chain.retry_cycle, state.agent_chain.current_role
885        ));
886    }
887
888    Ok(EventLoopResult {
889        completed,
890        events_processed,
891        final_phase: state.phase,
892        final_state: state.clone(),
893    })
894}
895
896/// Run the main event loop for the reducer-based pipeline.
897///
898/// This function orchestrates pipeline execution by repeatedly:
899/// 1. Determining the next effect based on the current state
900/// 2. Executing the effect through the effect handler (which performs side effects)
901/// 3. Applying the resulting event to state through the reducer (pure function)
902/// 4. Repeating until a terminal state is reached or max iterations exceeded
903///
904/// The entire event loop is wrapped in panic recovery to ensure the pipeline
905/// never crashes due to agent failures (panics only; aborts/segfaults cannot be recovered).
906///
907/// # Arguments
908///
909/// * `ctx` - Phase context for effect handlers
910/// * `initial_state` - Optional initial state (if None, creates a new state)
911/// * `config` - Event loop configuration
912///
913/// # Returns
914///
915/// Returns the event loop result containing the completion status and final state.
916pub fn run_event_loop(
917    ctx: &mut PhaseContext<'_>,
918    initial_state: Option<PipelineState>,
919    config: EventLoopConfig,
920) -> Result<EventLoopResult> {
921    let state = initial_state.unwrap_or_else(|| create_initial_state_with_config(ctx));
922    let mut handler = MainEffectHandler::new(state.clone());
923    run_event_loop_with_handler_traced(ctx, Some(state), config, &mut handler)
924}
925
926/// Run the event loop with a custom effect handler.
927///
928/// This variant allows injecting a custom effect handler for testing.
929/// The handler must implement `EffectHandler` and `StatefulHandler` traits.
930///
931/// # Arguments
932///
933/// * `ctx` - Phase context for effect handlers
934/// * `initial_state` - Optional initial state (if None, creates a new state)
935/// * `config` - Event loop configuration
936/// * `handler` - Custom effect handler (e.g., MockEffectHandler for testing)
937///
938/// # Returns
939///
940/// Returns the event loop result containing the completion status and final state.
941pub fn run_event_loop_with_handler<'ctx, H>(
942    ctx: &mut PhaseContext<'_>,
943    initial_state: Option<PipelineState>,
944    config: EventLoopConfig,
945    handler: &mut H,
946) -> Result<EventLoopResult>
947where
948    H: EffectHandler<'ctx> + StatefulHandler,
949{
950    run_event_loop_with_handler_traced(ctx, initial_state, config, handler)
951}
952
953/// Trait for handlers that maintain internal state.
954///
955/// This trait allows the event loop to update the handler's internal state
956/// after each event is processed.
957pub trait StatefulHandler {
958    /// Update the handler's internal state.
959    fn update_state(&mut self, state: PipelineState);
960}
961
962#[cfg(test)]
963mod tests {
964    use super::*;
965
966    include!("event_loop/tests_trace_dump.rs");
967    include!("event_loop/tests_checkpoint.rs");
968    include!("event_loop/tests_review_flow.rs");
969}