Skip to main content

ralph_workflow/app/
event_loop.rs

1//! Event loop for reducer-based pipeline architecture.
2//!
3//! This module implements main event loop that coordinates reducer,
4//! effect handlers, and orchestration logic. It provides a unified way to
5//! run the pipeline using the event-sourced architecture from RFC-004.
6//!
7//! # Non-Terminating Pipeline Architecture
8//!
9//! The pipeline is designed to be **non-terminating by default** for unattended operation.
10//! It must NEVER exit early due to internal failures, budget exhaustion, or agent errors.
11//!
12//! ## Failure Handling Flow
13//!
14//! 1. Any terminal failure (Status: Failed, budget exhausted, agent chain exhausted)
15//!    transitions to `AwaitingDevFix` phase
16//! 2. `TriggerDevFixFlow` effect writes completion marker to `.agent/tmp/completion_marker`
17//! 3. Dev-fix agent is optionally dispatched for remediation attempt
18//! 4. `CompletionMarkerEmitted` event transitions to `Interrupted` phase
19//! 5. `SaveCheckpoint` effect saves state for resume
20//! 6. Event loop returns `EventLoopResult { completed: true, ... }`
21//!
22//! ## Acceptable Termination Reasons
23//!
24//! The ONLY acceptable reasons for pipeline termination are catastrophic external events:
25//! - Process termination (SIGKILL)
26//! - Machine outage / power loss
27//! - OS kill signal
28//! - Unrecoverable panic in effect handler (caught and logged)
29//!
30//! All internal errors route through the failure handling flow above.
31
32use crate::logging::EventLoopLogger;
33use crate::phases::PhaseContext;
34use crate::reducer::effect::{Effect, EffectResult};
35use crate::reducer::event::{AwaitingDevFixEvent, CheckpointTrigger, PipelineEvent, PipelinePhase};
36use crate::reducer::state::ContinuationState;
37use crate::reducer::{
38    determine_next_effect, reduce, EffectHandler, MainEffectHandler, PipelineState,
39};
40use anyhow::Result;
41use serde::Serialize;
42use std::collections::VecDeque;
43use std::path::Path;
44use std::time::Instant;
45
46/// Create initial pipeline state with continuation limits from config.
47///
48/// This function creates a `PipelineState` with XSD retry and continuation limits
49/// loaded from the config, ensuring these values are available for the reducer
50/// to make deterministic retry decisions.
51pub(crate) fn create_initial_state_with_config(ctx: &PhaseContext<'_>) -> PipelineState {
52    // Config semantics: max_dev_continuations counts continuation attempts *beyond*
53    // the initial attempt. ContinuationState::max_continue_count semantics are
54    // "maximum total attempts including initial".
55    let max_dev_continuations = ctx.config.max_dev_continuations.unwrap_or(2);
56    let max_continue_count = 1 + max_dev_continuations;
57
58    let continuation = ContinuationState::with_limits(
59        ctx.config.max_xsd_retries.unwrap_or(10),
60        max_continue_count,
61        ctx.config.max_same_agent_retries.unwrap_or(2),
62    );
63    PipelineState::initial_with_continuation(
64        ctx.config.developer_iters,
65        ctx.config.reviewer_reviews,
66        continuation,
67    )
68}
69
70/// Maximum iterations for the main event loop to prevent infinite loops.
71///
72/// This is a safety limit - the pipeline should complete well before this limit
73/// under normal circumstances. If reached, it indicates either a bug in the
74/// reducer logic or an extremely complex project.
75///
76/// NOTE: Even 1_000_000 can still be too low for extremely slow-progress runs.
77/// If this cap is hit in practice, prefer making it configurable and/or
78/// investigating why the reducer is not converging.
79pub const MAX_EVENT_LOOP_ITERATIONS: usize = 1_000_000;
80
81/// Configuration for event loop.
82#[derive(Clone, Debug)]
83pub struct EventLoopConfig {
84    /// Maximum number of iterations to prevent infinite loops.
85    pub max_iterations: usize,
86}
87
88/// Result of event loop execution.
89#[derive(Debug, Clone)]
90pub struct EventLoopResult {
91    /// Whether pipeline completed successfully.
92    pub completed: bool,
93    /// Total events processed.
94    pub events_processed: usize,
95    /// Final reducer phase when the loop stopped.
96    pub final_phase: PipelinePhase,
97    /// Final pipeline state (for metrics and summary).
98    pub final_state: PipelineState,
99}
100
101const DEFAULT_EVENT_LOOP_TRACE_CAPACITY: usize = 200;
102
103#[derive(Clone, Serialize, Debug)]
104struct EventTraceEntry {
105    iteration: usize,
106    effect: String,
107    event: String,
108    phase: String,
109    xsd_retry_pending: bool,
110    xsd_retry_count: u32,
111    invalid_output_attempts: u32,
112    agent_index: usize,
113    model_index: usize,
114    retry_cycle: u32,
115}
116
117#[derive(Debug)]
118struct EventTraceBuffer {
119    capacity: usize,
120    entries: VecDeque<EventTraceEntry>,
121}
122
123impl EventTraceBuffer {
124    fn new(capacity: usize) -> Self {
125        Self {
126            capacity: capacity.max(1),
127            entries: VecDeque::new(),
128        }
129    }
130
131    fn push(&mut self, entry: EventTraceEntry) {
132        self.entries.push_back(entry);
133        while self.entries.len() > self.capacity {
134            self.entries.pop_front();
135        }
136    }
137
138    fn entries(&self) -> &VecDeque<EventTraceEntry> {
139        &self.entries
140    }
141}
142
143#[derive(Serialize)]
144struct EventTraceFinalState<'a> {
145    kind: &'static str,
146    reason: &'a str,
147    state: &'a PipelineState,
148}
149
150fn build_trace_entry(
151    iteration: usize,
152    state: &PipelineState,
153    effect: &str,
154    event: &str,
155) -> EventTraceEntry {
156    EventTraceEntry {
157        iteration,
158        effect: effect.to_string(),
159        event: event.to_string(),
160        phase: format!("{:?}", state.phase),
161        xsd_retry_pending: state.continuation.xsd_retry_pending,
162        xsd_retry_count: state.continuation.xsd_retry_count,
163        invalid_output_attempts: state.continuation.invalid_output_attempts,
164        agent_index: state.agent_chain.current_agent_index,
165        model_index: state.agent_chain.current_model_index,
166        retry_cycle: state.agent_chain.retry_cycle,
167    }
168}
169
170/// Extract ErrorEvent from anyhow::Error if present.
171///
172/// # Error Event Processing Architecture
173///
174/// Effect handlers return errors through `Err(ErrorEvent::Variant.into())`. This function
175/// extracts the original `ErrorEvent` so it can be processed through the reducer.
176///
177/// ## Why Downcast?
178///
179/// When an effect handler returns `Err(ErrorEvent::Variant.into())`, the error is wrapped
180/// in an `anyhow::Error`. Since `ErrorEvent` implements `std::error::Error`, anyhow's
181/// blanket `From` implementation preserves the original error type, allowing us to downcast
182/// back to `ErrorEvent` for reducer processing.
183///
184/// ## Processing Flow
185///
186/// 1. Handler returns `Err(ErrorEvent::AgentChainExhausted { ... }.into())`
187/// 2. Event loop catches the error and calls this function
188/// 3. If downcast succeeds, wrap in `PipelineEvent::PromptInput(PromptInputEvent::HandlerError { ... })`
189///    and process through reducer
190/// 4. If downcast fails, return `Err()` to terminate the event loop (truly unrecoverable error)
191///
192/// This architecture allows the reducer to decide recovery strategy based on the specific
193/// error type, rather than terminating immediately on any `Err()`.
194fn extract_error_event(err: &anyhow::Error) -> Option<crate::reducer::event::ErrorEvent> {
195    // Handlers are allowed to wrap typed ErrorEvents with additional context
196    // (e.g. via `anyhow::Context`). Search the full error chain so we still
197    // recover the underlying reducer error event.
198    for cause in err.chain() {
199        if let Some(error_event) = cause.downcast_ref::<crate::reducer::event::ErrorEvent>() {
200            return Some(error_event.clone());
201        }
202    }
203    None
204}
205
206enum GuardedEffectResult {
207    Ok(Box<EffectResult>),
208    Unrecoverable(anyhow::Error),
209    Panic,
210}
211
212fn execute_effect_guarded<'ctx, H>(
213    handler: &mut H,
214    effect: Effect,
215    ctx: &mut PhaseContext<'_>,
216    state: &PipelineState,
217) -> GuardedEffectResult
218where
219    H: EffectHandler<'ctx>,
220{
221    match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
222        handler.execute(effect, ctx)
223    })) {
224        Ok(Ok(result)) => GuardedEffectResult::Ok(Box::new(result)),
225        Ok(Err(err)) => {
226            if let Some(error_event) = extract_error_event(&err) {
227                GuardedEffectResult::Ok(Box::new(crate::reducer::effect::EffectResult::event(
228                    crate::reducer::event::PipelineEvent::PromptInput(
229                        crate::reducer::event::PromptInputEvent::HandlerError {
230                            phase: state.phase,
231                            error: error_event,
232                        },
233                    ),
234                )))
235            } else {
236                GuardedEffectResult::Unrecoverable(err)
237            }
238        }
239        Err(_) => GuardedEffectResult::Panic,
240    }
241}
242
243fn dump_event_loop_trace(
244    ctx: &mut PhaseContext<'_>,
245    trace: &EventTraceBuffer,
246    final_state: &PipelineState,
247    reason: &str,
248) -> bool {
249    let mut out = String::new();
250
251    for entry in trace.entries() {
252        match serde_json::to_string(entry) {
253            Ok(line) => {
254                out.push_str(&line);
255                out.push('\n');
256            }
257            Err(err) => {
258                ctx.logger.error(&format!(
259                    "Failed to serialize event loop trace entry: {err}"
260                ));
261            }
262        }
263    }
264
265    let final_line = match serde_json::to_string(&EventTraceFinalState {
266        kind: "final_state",
267        reason,
268        state: final_state,
269    }) {
270        Ok(line) => line,
271        Err(err) => {
272            ctx.logger.error(&format!(
273                "Failed to serialize event loop final state: {err}"
274            ));
275            // Ensure the file still contains something useful.
276            format!(
277                "{{\"kind\":\"final_state\",\"reason\":{},\"phase\":{}}}",
278                serde_json::to_string(reason).unwrap_or_else(|_| "\"unknown\"".to_string()),
279                serde_json::to_string(&format!("{:?}", final_state.phase))
280                    .unwrap_or_else(|_| "\"unknown\"".to_string())
281            )
282        }
283    };
284    out.push_str(&final_line);
285    out.push('\n');
286
287    // Get trace path from run log context
288    let trace_path = ctx.run_log_context.event_loop_trace();
289
290    // Ensure the trace directory exists. While `Workspace::write` is expected to
291    // create parent directories, we do this explicitly so trace dumping is
292    // resilient even under stricter workspace implementations.
293    if let Some(parent) = trace_path.parent() {
294        if let Err(err) = ctx.workspace.create_dir_all(parent) {
295            ctx.logger
296                .error(&format!("Failed to create trace directory: {err}"));
297            return false;
298        }
299    }
300
301    match ctx.workspace.write(&trace_path, &out) {
302        Ok(()) => true,
303        Err(err) => {
304            ctx.logger
305                .error(&format!("Failed to write event loop trace: {err}"));
306            false
307        }
308    }
309}
310
311fn write_completion_marker_on_error(ctx: &mut PhaseContext<'_>, err: &anyhow::Error) -> bool {
312    if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
313        ctx.logger.error(&format!(
314            "Failed to create completion marker directory: {err}"
315        ));
316        return false;
317    }
318
319    let marker_path = Path::new(".agent/tmp/completion_marker");
320    let content = format!("failure\nUnrecoverable handler error: {err}");
321    match ctx.workspace.write(marker_path, &content) {
322        Ok(()) => true,
323        Err(err) => {
324            ctx.logger.error(&format!(
325                "Failed to write completion marker for unrecoverable handler error: {err}"
326            ));
327            false
328        }
329    }
330}
331
332fn run_event_loop_with_handler_traced<'ctx, H>(
333    ctx: &mut PhaseContext<'_>,
334    initial_state: Option<PipelineState>,
335    config: EventLoopConfig,
336    handler: &mut H,
337) -> Result<EventLoopResult>
338where
339    H: EffectHandler<'ctx> + StatefulHandler,
340{
341    let mut state = initial_state.unwrap_or_else(|| create_initial_state_with_config(ctx));
342
343    handler.update_state(state.clone());
344    let mut events_processed = 0;
345    let mut trace = EventTraceBuffer::new(DEFAULT_EVENT_LOOP_TRACE_CAPACITY);
346
347    // Create event loop logger, continuing from existing log if present (resume case)
348    let event_loop_log_path = ctx.run_log_context.event_loop_log();
349    let mut event_loop_logger =
350        match EventLoopLogger::from_existing_log(ctx.workspace, &event_loop_log_path) {
351            Ok(logger) => logger,
352            Err(e) => {
353                // If reading existing log fails, log a warning and start fresh
354                ctx.logger.warn(&format!(
355                    "Failed to read existing event loop log, starting fresh: {}",
356                    e
357                ));
358                EventLoopLogger::new()
359            }
360        };
361
362    ctx.logger.info("Starting reducer-based event loop");
363
364    while events_processed < config.max_iterations {
365        // Check if we're already in a terminal state before executing any effect.
366        // This handles the case where the initial state is already complete
367        // (e.g., resuming from an Interrupted checkpoint).
368        //
369        // Special case: If we just transitioned to Interrupted from AwaitingDevFix
370        // without a checkpoint, allow one more iteration to execute SaveCheckpoint.
371        //
372        // CRITICAL: If we're in AwaitingDevFix and haven't executed TriggerDevFixFlow yet,
373        // allow at least one iteration to execute it, even if approaching max iterations.
374        // This ensures completion marker is ALWAYS written and dev-fix agent is ALWAYS
375        // dispatched before the event loop can exit.
376        let should_allow_checkpoint_save = matches!(state.phase, PipelinePhase::Interrupted)
377            && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
378            && state.checkpoint_saved_count == 0;
379
380        let is_awaiting_dev_fix_not_triggered =
381            matches!(state.phase, PipelinePhase::AwaitingDevFix) && !state.dev_fix_triggered;
382
383        if state.is_complete()
384            && !should_allow_checkpoint_save
385            && !is_awaiting_dev_fix_not_triggered
386        {
387            ctx.logger.info(&format!(
388                "Event loop: state already complete (phase: {:?}, checkpoint_saved_count: {})",
389                state.phase, state.checkpoint_saved_count
390            ));
391            break;
392        }
393
394        let effect = determine_next_effect(&state);
395        let effect_str = format!("{effect:?}");
396
397        // Execute returns EffectResult with both PipelineEvent and UIEvents.
398        // Catch panics here so we can still dump a best-effort trace.
399        let start_time = Instant::now();
400        let result = match execute_effect_guarded(handler, effect, ctx, &state) {
401            GuardedEffectResult::Ok(result) => *result,
402            GuardedEffectResult::Unrecoverable(err) => {
403                // Non-terminating-by-default requirement:
404                // Even "unrecoverable" handler errors must route through reducer-visible
405                // remediation (AwaitingDevFix) so TriggerDevFixFlow can write the completion
406                // marker and dispatch dev-fix.
407                let dumped =
408                    dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
409                let marker_written = write_completion_marker_on_error(ctx, &err);
410                if dumped {
411                    let trace_path = ctx.run_log_context.event_loop_trace();
412                    ctx.logger.error(&format!(
413                        "Event loop encountered unrecoverable handler error (trace: {})",
414                        trace_path.display()
415                    ));
416                } else {
417                    ctx.logger
418                        .error("Event loop encountered unrecoverable handler error");
419                }
420                if marker_written {
421                    ctx.logger
422                        .info("Completion marker written for unrecoverable handler error");
423                }
424
425                // Emit a reducer-visible error that transitions us into AwaitingDevFix.
426                // We don't preserve the original error as a typed ErrorEvent; this is a last-resort
427                // path to guarantee remediation and completion marker semantics.
428                let failure_event = PipelineEvent::PromptInput(
429                    crate::reducer::event::PromptInputEvent::HandlerError {
430                        phase: state.phase,
431                        error: crate::reducer::event::ErrorEvent::WorkspaceWriteFailed {
432                            path: "(unrecoverable_handler_error)".to_string(),
433                            kind: crate::reducer::event::WorkspaceIoErrorKind::Other,
434                        },
435                    },
436                );
437
438                let event_str = format!("{:?}", failure_event);
439                let duration_ms = start_time.elapsed().as_millis() as u64;
440                let new_state = reduce(state, failure_event);
441
442                // Log to event loop log (best-effort, does not affect correctness)
443                let context_pairs: Vec<(&str, String)> = vec![
444                    ("iteration", new_state.iteration.to_string()),
445                    ("reviewer_pass", new_state.reviewer_pass.to_string()),
446                    ("error_kind", "unrecoverable_failure".to_string()),
447                    ("effect", effect_str.clone()),
448                ];
449                let context_refs: Vec<(&str, &str)> = context_pairs
450                    .iter()
451                    .map(|(k, v)| (*k, v.as_str()))
452                    .collect();
453                if let Err(e) = event_loop_logger.log_effect(crate::logging::LogEffectParams {
454                    workspace: ctx.workspace,
455                    log_path: &ctx.run_log_context.event_loop_log(),
456                    phase: new_state.phase,
457                    effect: &effect_str,
458                    primary_event: &event_str,
459                    extra_events: &[],
460                    duration_ms,
461                    context: &context_refs,
462                }) {
463                    ctx.logger
464                        .warn(&format!("Failed to write to event loop log: {}", e));
465                }
466
467                trace.push(build_trace_entry(
468                    events_processed,
469                    &new_state,
470                    &effect_str,
471                    &event_str,
472                ));
473                handler.update_state(new_state.clone());
474                state = new_state;
475                events_processed += 1;
476
477                continue;
478            }
479            GuardedEffectResult::Panic => {
480                // Handler panics are internal failures and must not terminate the run.
481                // Route through AwaitingDevFix so TriggerDevFixFlow writes the completion marker and
482                // dispatches dev-fix.
483                let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
484                if dumped {
485                    let trace_path = ctx.run_log_context.event_loop_trace();
486                    ctx.logger.error(&format!(
487                        "Event loop recovered from panic (trace: {})",
488                        trace_path.display()
489                    ));
490                } else {
491                    ctx.logger.error("Event loop recovered from panic");
492                }
493
494                // Best-effort completion marker for orchestration, even if the dev-fix flow fails.
495                if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
496                    ctx.logger.error(&format!(
497                        "Failed to create completion marker directory: {err}"
498                    ));
499                }
500                let marker_path = Path::new(".agent/tmp/completion_marker");
501                let content = format!(
502                    "failure\nHandler panic in effect execution (phase={:?}, events_processed={})",
503                    state.phase, events_processed
504                );
505                if let Err(err) = ctx.workspace.write(marker_path, &content) {
506                    ctx.logger.error(&format!(
507                        "Failed to write completion marker for handler panic: {err}"
508                    ));
509                }
510
511                let failure_event = PipelineEvent::PromptInput(
512                    crate::reducer::event::PromptInputEvent::HandlerError {
513                        phase: state.phase,
514                        error: crate::reducer::event::ErrorEvent::WorkspaceWriteFailed {
515                            path: "(handler_panic)".to_string(),
516                            kind: crate::reducer::event::WorkspaceIoErrorKind::Other,
517                        },
518                    },
519                );
520
521                let event_str = format!("{:?}", failure_event);
522                let duration_ms = start_time.elapsed().as_millis() as u64;
523                let new_state = reduce(state, failure_event);
524
525                // Log to event loop log (best-effort, does not affect correctness)
526                let context_pairs: Vec<(&str, String)> = vec![
527                    ("iteration", new_state.iteration.to_string()),
528                    ("reviewer_pass", new_state.reviewer_pass.to_string()),
529                    ("error_kind", "handler_panic".to_string()),
530                    ("effect", effect_str.clone()),
531                ];
532                let context_refs: Vec<(&str, &str)> = context_pairs
533                    .iter()
534                    .map(|(k, v)| (*k, v.as_str()))
535                    .collect();
536                if let Err(e) = event_loop_logger.log_effect(crate::logging::LogEffectParams {
537                    workspace: ctx.workspace,
538                    log_path: &ctx.run_log_context.event_loop_log(),
539                    phase: new_state.phase,
540                    effect: &effect_str,
541                    primary_event: &event_str,
542                    extra_events: &[],
543                    duration_ms,
544                    context: &context_refs,
545                }) {
546                    ctx.logger
547                        .warn(&format!("Failed to write to event loop log: {}", e));
548                }
549
550                trace.push(build_trace_entry(
551                    events_processed,
552                    &new_state,
553                    &effect_str,
554                    &event_str,
555                ));
556                handler.update_state(new_state.clone());
557                state = new_state;
558                events_processed += 1;
559
560                continue;
561            }
562        };
563
564        // Display UI events (does not affect state)
565        for ui_event in &result.ui_events {
566            ctx.logger
567                .info(&crate::rendering::render_ui_event(ui_event));
568        }
569
570        let event_str = format!("{:?}", result.event);
571        let duration_ms = start_time.elapsed().as_millis() as u64;
572
573        // Apply pipeline event to state (reducer remains pure)
574        let new_state = reduce(state, result.event.clone());
575
576        // Log to event loop log (best-effort, does not affect correctness)
577        let extra_events: Vec<String> = result
578            .additional_events
579            .iter()
580            .map(|e| format!("{:?}", e))
581            .collect();
582        let context_pairs: Vec<(&str, String)> = vec![
583            ("iteration", new_state.iteration.to_string()),
584            ("reviewer_pass", new_state.reviewer_pass.to_string()),
585        ];
586        let context_refs: Vec<(&str, &str)> = context_pairs
587            .iter()
588            .map(|(k, v)| (*k, v.as_str()))
589            .collect();
590        if let Err(e) = event_loop_logger.log_effect(crate::logging::LogEffectParams {
591            workspace: ctx.workspace,
592            log_path: &ctx.run_log_context.event_loop_log(),
593            phase: new_state.phase,
594            effect: &effect_str,
595            primary_event: &event_str,
596            extra_events: &extra_events,
597            duration_ms,
598            context: &context_refs,
599        }) {
600            ctx.logger
601                .warn(&format!("Failed to write to event loop log: {}", e));
602        }
603
604        trace.push(build_trace_entry(
605            events_processed,
606            &new_state,
607            &effect_str,
608            &event_str,
609        ));
610        handler.update_state(new_state.clone());
611        state = new_state;
612        events_processed += 1;
613
614        // Apply additional pipeline events in order.
615        for event in result.additional_events {
616            let event_str = format!("{:?}", event);
617            let additional_state = reduce(state, event);
618            trace.push(build_trace_entry(
619                events_processed,
620                &additional_state,
621                &effect_str,
622                &event_str,
623            ));
624            handler.update_state(additional_state.clone());
625            state = additional_state;
626            events_processed += 1;
627        }
628
629        // Update loop detection counters AFTER all events have been processed.
630        // This is critical: additional events can change phase, agent chain, etc.,
631        // and loop detection must consider the final state after all reductions.
632        let current_fingerprint = crate::reducer::compute_effect_fingerprint(&state);
633        state = PipelineState {
634            continuation: state
635                .continuation
636                .update_loop_detection_counters(current_fingerprint),
637            ..state
638        };
639        handler.update_state(state.clone());
640
641        // Check completion AFTER effect execution and state update.
642        // This ensures that transitions to terminal phases (e.g., Interrupted)
643        // have a chance to save their checkpoint before the loop exits.
644        //
645        // Special case: If we just transitioned to Interrupted from AwaitingDevFix
646        // without a checkpoint, allow one more iteration to execute SaveCheckpoint.
647        // This is needed because TriggerDevFixFlow already wrote the completion marker,
648        // making is_complete() return true, but we still want to save the checkpoint
649        // for proper state persistence.
650        //
651        // CRITICAL: If we're in AwaitingDevFix and haven't executed TriggerDevFixFlow yet,
652        // allow at least one iteration to execute it, even if approaching max iterations.
653        let should_allow_checkpoint_save = matches!(state.phase, PipelinePhase::Interrupted)
654            && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
655            && state.checkpoint_saved_count == 0;
656
657        let is_awaiting_dev_fix_not_triggered =
658            matches!(state.phase, PipelinePhase::AwaitingDevFix) && !state.dev_fix_triggered;
659
660        if state.is_complete()
661            && !should_allow_checkpoint_save
662            && !is_awaiting_dev_fix_not_triggered
663        {
664            ctx.logger.info(&format!(
665                "Event loop: state became complete (phase: {:?}, checkpoint_saved_count: {})",
666                state.phase, state.checkpoint_saved_count
667            ));
668
669            // DEFENSIVE: If we're in Interrupted from AwaitingDevFix without a checkpoint,
670            // warn that SaveCheckpoint should execute next
671            if matches!(state.phase, PipelinePhase::Interrupted)
672                && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
673                && state.checkpoint_saved_count == 0
674            {
675                ctx.logger.warn(
676                    "Interrupted phase reached from AwaitingDevFix without checkpoint saved. \
677                     SaveCheckpoint effect should execute on next iteration.",
678                );
679            }
680
681            break;
682        }
683    }
684
685    // Track if we had to force-complete due to max iterations in AwaitingDevFix
686    let mut forced_completion = false;
687
688    let should_force_checkpoint_after_completion =
689        matches!(state.phase, PipelinePhase::Interrupted)
690            && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
691            && state.checkpoint_saved_count == 0;
692
693    if events_processed >= config.max_iterations && should_force_checkpoint_after_completion {
694        ctx.logger.warn(
695            "Max iterations reached after completion marker; forcing SaveCheckpoint execution",
696        );
697
698        let save_effect = Effect::SaveCheckpoint {
699            trigger: CheckpointTrigger::Interrupt,
700        };
701        let save_effect_str = format!("{save_effect:?}");
702        match execute_effect_guarded(handler, save_effect, ctx, &state) {
703            GuardedEffectResult::Ok(result) => {
704                let result = *result;
705                let event_str = format!("{:?}", result.event);
706                state = reduce(state, result.event.clone());
707                trace.push(build_trace_entry(
708                    events_processed,
709                    &state,
710                    &save_effect_str,
711                    &event_str,
712                ));
713                handler.update_state(state.clone());
714                events_processed += 1;
715
716                for event in result.additional_events {
717                    let event_str = format!("{:?}", event);
718                    state = reduce(state, event);
719                    trace.push(build_trace_entry(
720                        events_processed,
721                        &state,
722                        &save_effect_str,
723                        &event_str,
724                    ));
725                    handler.update_state(state.clone());
726                    events_processed += 1;
727                }
728            }
729            GuardedEffectResult::Unrecoverable(err) => {
730                // Non-terminating-by-default: even failures while forcing checkpoint completion
731                // must route through AwaitingDevFix rather than returning Err.
732                let dumped =
733                    dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
734                let marker_written = write_completion_marker_on_error(ctx, &err);
735                if dumped {
736                    let trace_path = ctx.run_log_context.event_loop_trace();
737                    ctx.logger.error(&format!(
738                        "Event loop encountered unrecoverable handler error (trace: {})",
739                        trace_path.display()
740                    ));
741                } else {
742                    ctx.logger
743                        .error("Event loop encountered unrecoverable handler error");
744                }
745                if marker_written {
746                    ctx.logger
747                        .info("Completion marker written for unrecoverable handler error");
748                }
749
750                // We can't safely continue execution here (we are outside the main loop).
751                // State is already terminal (Interrupted from AwaitingDevFix), so return completion
752                // even if SaveCheckpoint fails.
753                return Ok(EventLoopResult {
754                    completed: true,
755                    events_processed,
756                    final_phase: state.phase,
757                    final_state: state.clone(),
758                });
759            }
760            GuardedEffectResult::Panic => {
761                // Panics during forced completion are internal failures; route through AwaitingDevFix.
762                let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
763                if dumped {
764                    let trace_path = ctx.run_log_context.event_loop_trace();
765                    ctx.logger.error(&format!(
766                        "Event loop recovered from panic (trace: {})",
767                        trace_path.display()
768                    ));
769                } else {
770                    ctx.logger.error("Event loop recovered from panic");
771                }
772
773                if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
774                    ctx.logger.error(&format!(
775                        "Failed to create completion marker directory: {err}"
776                    ));
777                }
778                let marker_path = Path::new(".agent/tmp/completion_marker");
779                let content = format!(
780                    "failure\nHandler panic during forced completion (phase={:?}, events_processed={})",
781                    state.phase, events_processed
782                );
783                if let Err(err) = ctx.workspace.write(marker_path, &content) {
784                    ctx.logger.error(&format!(
785                        "Failed to write completion marker for handler panic: {err}"
786                    ));
787                }
788
789                // We can't safely continue execution here (we are outside the main loop).
790                // State is already terminal (Interrupted from AwaitingDevFix), so return completion
791                // even if SaveCheckpoint fails.
792                return Ok(EventLoopResult {
793                    completed: true,
794                    events_processed,
795                    final_phase: state.phase,
796                    final_state: state.clone(),
797                });
798            }
799        }
800    }
801
802    if events_processed >= config.max_iterations && !state.is_complete() {
803        let dumped = dump_event_loop_trace(ctx, &trace, &state, "max_iterations");
804
805        // DEFENSIVE: If max iterations reached in AwaitingDevFix without dev_fix_triggered,
806        // this is a bug (TriggerDevFixFlow should execute first). However, to maintain the
807        // non-terminating pipeline principle, we force completion:
808        // 1. Write completion marker (signals orchestration)
809        // 2. Emit CompletionMarkerEmitted event (transitions to Interrupted)
810        // 3. Execute SaveCheckpoint (makes is_complete() return true)
811        //
812        // This ensures the pipeline NEVER exits early due to internal logic bugs.
813        // Budget exhaustion should transition to commit/finalization, not terminate.
814        if matches!(state.phase, PipelinePhase::AwaitingDevFix) {
815            ctx.logger.error(
816                "BUG: Hit max iterations in AwaitingDevFix phase. \
817                 TriggerDevFixFlow should have executed before reaching this point. \
818                 Applying defensive recovery logic.",
819            );
820            ctx.logger
821                .warn("Max iterations reached in AwaitingDevFix - forcing completion marker");
822
823            // Write completion marker
824            if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
825                ctx.logger.error(&format!(
826                    "Failed to create completion marker directory: {err}"
827                ));
828            }
829            let marker_path = Path::new(".agent/tmp/completion_marker");
830            let content = format!(
831                "failure\nMax iterations reached in AwaitingDevFix phase (events_processed={})",
832                events_processed
833            );
834            match ctx.workspace.write(marker_path, &content) {
835                Ok(()) => {
836                    ctx.logger
837                        .info("Completion marker written for max iterations failure");
838                }
839                Err(err) => {
840                    ctx.logger.error(&format!(
841                        "Failed to write completion marker for max iterations failure: {err}"
842                    ));
843                }
844            }
845
846            let completion_event =
847                PipelineEvent::AwaitingDevFix(AwaitingDevFixEvent::CompletionMarkerEmitted {
848                    is_failure: true,
849                });
850            let completion_event_str = format!("{:?}", completion_event);
851            state = reduce(state, completion_event);
852            trace.push(build_trace_entry(
853                events_processed,
854                &state,
855                "ForcedCompletionMarker",
856                &completion_event_str,
857            ));
858            handler.update_state(state.clone());
859            events_processed += 1;
860
861            let save_effect = Effect::SaveCheckpoint {
862                trigger: CheckpointTrigger::Interrupt,
863            };
864            let save_effect_str = format!("{save_effect:?}");
865            match execute_effect_guarded(handler, save_effect, ctx, &state) {
866                GuardedEffectResult::Ok(result) => {
867                    let result = *result;
868                    let event_str = format!("{:?}", result.event);
869                    state = reduce(state, result.event.clone());
870                    trace.push(build_trace_entry(
871                        events_processed,
872                        &state,
873                        &save_effect_str,
874                        &event_str,
875                    ));
876                    handler.update_state(state.clone());
877                    events_processed += 1;
878
879                    for event in result.additional_events {
880                        let event_str = format!("{:?}", event);
881                        state = reduce(state, event);
882                        trace.push(build_trace_entry(
883                            events_processed,
884                            &state,
885                            &save_effect_str,
886                            &event_str,
887                        ));
888                        handler.update_state(state.clone());
889                        events_processed += 1;
890                    }
891                }
892                GuardedEffectResult::Unrecoverable(err) => {
893                    // Non-terminating-by-default: even errors during forced completion must route
894                    // through AwaitingDevFix instead of returning Err.
895                    let dumped =
896                        dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
897                    let marker_written = write_completion_marker_on_error(ctx, &err);
898                    if dumped {
899                        let trace_path = ctx.run_log_context.event_loop_trace();
900                        ctx.logger.error(&format!(
901                            "Event loop encountered unrecoverable handler error (trace: {})",
902                            trace_path.display()
903                        ));
904                    } else {
905                        ctx.logger
906                            .error("Event loop encountered unrecoverable handler error");
907                    }
908                    if marker_written {
909                        ctx.logger
910                            .info("Completion marker written for unrecoverable handler error");
911                    }
912
913                    // We can't safely continue execution here (we are outside the main loop).
914                    // State is already terminal (Interrupted from AwaitingDevFix).
915                    // However, the run did NOT complete cleanly, so report incomplete while still
916                    // having written a best-effort completion marker above.
917                    return Ok(EventLoopResult {
918                        completed: false,
919                        events_processed,
920                        final_phase: state.phase,
921                        final_state: state.clone(),
922                    });
923                }
924                GuardedEffectResult::Panic => {
925                    // Panics during forced completion are internal failures; route through AwaitingDevFix.
926                    let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
927                    if dumped {
928                        let trace_path = ctx.run_log_context.event_loop_trace();
929                        ctx.logger.error(&format!(
930                            "Event loop recovered from panic (trace: {})",
931                            trace_path.display()
932                        ));
933                    } else {
934                        ctx.logger.error("Event loop recovered from panic");
935                    }
936
937                    if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
938                        ctx.logger.error(&format!(
939                            "Failed to create completion marker directory: {err}"
940                        ));
941                    }
942                    let marker_path = Path::new(".agent/tmp/completion_marker");
943                    let content = format!(
944                        "failure\nHandler panic during forced completion (phase={:?}, events_processed={})",
945                        state.phase, events_processed
946                    );
947                    if let Err(err) = ctx.workspace.write(marker_path, &content) {
948                        ctx.logger.error(&format!(
949                            "Failed to write completion marker for handler panic: {err}"
950                        ));
951                    }
952
953                    // We can't safely continue execution here (we are outside the main loop).
954                    // State is already terminal (Interrupted from AwaitingDevFix).
955                    // However, the run did NOT complete cleanly, so report incomplete while still
956                    // having written a best-effort completion marker above.
957                    return Ok(EventLoopResult {
958                        completed: false,
959                        events_processed,
960                        final_phase: state.phase,
961                        final_state: state.clone(),
962                    });
963                }
964            }
965
966            forced_completion = true;
967
968            ctx.logger
969                .info("Forced transition to Interrupted phase to satisfy termination requirements");
970        }
971
972        if dumped {
973            let trace_path = ctx.run_log_context.event_loop_trace();
974            ctx.logger.warn(&format!(
975                "Event loop reached max iterations ({}) without completion (trace: {})",
976                config.max_iterations,
977                trace_path.display()
978            ));
979        } else {
980            ctx.logger.warn(&format!(
981                "Event loop reached max iterations ({}) without completion",
982                config.max_iterations
983            ));
984        }
985
986        if !forced_completion {
987            ctx.logger.error(&format!(
988                "Event loop exiting: reason=max_iterations, phase={:?}, checkpoint_saved_count={}, events_processed={}",
989                state.phase, state.checkpoint_saved_count, events_processed
990            ));
991        }
992    }
993
994    let completed = state.is_complete();
995    if !completed {
996        ctx.logger.warn(&format!(
997            "Event loop exiting without completion: phase={:?}, checkpoint_saved_count={}, \
998             previous_phase={:?}, events_processed={}",
999            state.phase, state.checkpoint_saved_count, state.previous_phase, events_processed
1000        ));
1001        ctx.logger.info(&format!(
1002            "Final state: agent_chain.retry_cycle={}, agent_chain.current_role={:?}",
1003            state.agent_chain.retry_cycle, state.agent_chain.current_role
1004        ));
1005    }
1006
1007    Ok(EventLoopResult {
1008        completed,
1009        events_processed,
1010        final_phase: state.phase,
1011        final_state: state.clone(),
1012    })
1013}
1014
1015/// Run the main event loop for the reducer-based pipeline.
1016///
1017/// This function orchestrates pipeline execution by repeatedly:
1018/// 1. Determining the next effect based on the current state
1019/// 2. Executing the effect through the effect handler (which performs side effects)
1020/// 3. Applying the resulting event to state through the reducer (pure function)
1021/// 4. Repeating until a terminal state is reached or max iterations exceeded
1022///
1023/// The entire event loop is wrapped in panic recovery to ensure the pipeline
1024/// never crashes due to agent failures (panics only; aborts/segfaults cannot be recovered).
1025///
1026/// # Arguments
1027///
1028/// * `ctx` - Phase context for effect handlers
1029/// * `initial_state` - Optional initial state (if None, creates a new state)
1030/// * `config` - Event loop configuration
1031///
1032/// # Returns
1033///
1034/// Returns the event loop result containing the completion status and final state.
1035pub fn run_event_loop(
1036    ctx: &mut PhaseContext<'_>,
1037    initial_state: Option<PipelineState>,
1038    config: EventLoopConfig,
1039) -> Result<EventLoopResult> {
1040    let state = initial_state.unwrap_or_else(|| create_initial_state_with_config(ctx));
1041    let mut handler = MainEffectHandler::new(state.clone());
1042    run_event_loop_with_handler_traced(ctx, Some(state), config, &mut handler)
1043}
1044
1045/// Run the event loop with a custom effect handler.
1046///
1047/// This variant allows injecting a custom effect handler for testing.
1048/// The handler must implement `EffectHandler` and `StatefulHandler` traits.
1049///
1050/// # Arguments
1051///
1052/// * `ctx` - Phase context for effect handlers
1053/// * `initial_state` - Optional initial state (if None, creates a new state)
1054/// * `config` - Event loop configuration
1055/// * `handler` - Custom effect handler (e.g., MockEffectHandler for testing)
1056///
1057/// # Returns
1058///
1059/// Returns the event loop result containing the completion status and final state.
1060pub fn run_event_loop_with_handler<'ctx, H>(
1061    ctx: &mut PhaseContext<'_>,
1062    initial_state: Option<PipelineState>,
1063    config: EventLoopConfig,
1064    handler: &mut H,
1065) -> Result<EventLoopResult>
1066where
1067    H: EffectHandler<'ctx> + StatefulHandler,
1068{
1069    run_event_loop_with_handler_traced(ctx, initial_state, config, handler)
1070}
1071
1072/// Trait for handlers that maintain internal state.
1073///
1074/// This trait allows the event loop to update the handler's internal state
1075/// after each event is processed.
1076pub trait StatefulHandler {
1077    /// Update the handler's internal state.
1078    fn update_state(&mut self, state: PipelineState);
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use super::*;
1084
1085    include!("event_loop/tests_trace_dump.rs");
1086    include!("event_loop/tests_checkpoint.rs");
1087    include!("event_loop/tests_review_flow.rs");
1088}