1use crate::logging::EventLoopLogger;
33use crate::phases::PhaseContext;
34use crate::reducer::effect::{Effect, EffectResult};
35use crate::reducer::event::{AwaitingDevFixEvent, CheckpointTrigger, PipelineEvent, PipelinePhase};
36use crate::reducer::state::ContinuationState;
37use crate::reducer::{
38 determine_next_effect, reduce, EffectHandler, MainEffectHandler, PipelineState,
39};
40use anyhow::Result;
41use serde::Serialize;
42use std::collections::VecDeque;
43use std::path::Path;
44use std::time::Instant;
45
46pub(crate) fn create_initial_state_with_config(ctx: &PhaseContext<'_>) -> PipelineState {
52 let max_dev_continuations = ctx.config.max_dev_continuations.unwrap_or(2);
56 let max_continue_count = 1 + max_dev_continuations;
57
58 let continuation = ContinuationState::with_limits(
59 ctx.config.max_xsd_retries.unwrap_or(10),
60 max_continue_count,
61 ctx.config.max_same_agent_retries.unwrap_or(2),
62 );
63 PipelineState::initial_with_continuation(
64 ctx.config.developer_iters,
65 ctx.config.reviewer_reviews,
66 continuation,
67 )
68}
69
70pub const MAX_EVENT_LOOP_ITERATIONS: usize = 1_000_000;
80
81#[derive(Clone, Debug)]
83pub struct EventLoopConfig {
84 pub max_iterations: usize,
86}
87
88#[derive(Debug, Clone)]
90pub struct EventLoopResult {
91 pub completed: bool,
93 pub events_processed: usize,
95 pub final_phase: PipelinePhase,
97 pub final_state: PipelineState,
99}
100
101const DEFAULT_EVENT_LOOP_TRACE_CAPACITY: usize = 200;
102
103#[derive(Clone, Serialize, Debug)]
104struct EventTraceEntry {
105 iteration: usize,
106 effect: String,
107 event: String,
108 phase: String,
109 xsd_retry_pending: bool,
110 xsd_retry_count: u32,
111 invalid_output_attempts: u32,
112 agent_index: usize,
113 model_index: usize,
114 retry_cycle: u32,
115}
116
117#[derive(Debug)]
118struct EventTraceBuffer {
119 capacity: usize,
120 entries: VecDeque<EventTraceEntry>,
121}
122
123impl EventTraceBuffer {
124 fn new(capacity: usize) -> Self {
125 Self {
126 capacity: capacity.max(1),
127 entries: VecDeque::new(),
128 }
129 }
130
131 fn push(&mut self, entry: EventTraceEntry) {
132 self.entries.push_back(entry);
133 while self.entries.len() > self.capacity {
134 self.entries.pop_front();
135 }
136 }
137
138 fn entries(&self) -> &VecDeque<EventTraceEntry> {
139 &self.entries
140 }
141}
142
143#[derive(Serialize)]
144struct EventTraceFinalState<'a> {
145 kind: &'static str,
146 reason: &'a str,
147 state: &'a PipelineState,
148}
149
150fn build_trace_entry(
151 iteration: usize,
152 state: &PipelineState,
153 effect: &str,
154 event: &str,
155) -> EventTraceEntry {
156 EventTraceEntry {
157 iteration,
158 effect: effect.to_string(),
159 event: event.to_string(),
160 phase: format!("{:?}", state.phase),
161 xsd_retry_pending: state.continuation.xsd_retry_pending,
162 xsd_retry_count: state.continuation.xsd_retry_count,
163 invalid_output_attempts: state.continuation.invalid_output_attempts,
164 agent_index: state.agent_chain.current_agent_index,
165 model_index: state.agent_chain.current_model_index,
166 retry_cycle: state.agent_chain.retry_cycle,
167 }
168}
169
170fn extract_error_event(err: &anyhow::Error) -> Option<crate::reducer::event::ErrorEvent> {
195 for cause in err.chain() {
199 if let Some(error_event) = cause.downcast_ref::<crate::reducer::event::ErrorEvent>() {
200 return Some(error_event.clone());
201 }
202 }
203 None
204}
205
206enum GuardedEffectResult {
207 Ok(Box<EffectResult>),
208 Unrecoverable(anyhow::Error),
209 Panic,
210}
211
212fn execute_effect_guarded<'ctx, H>(
213 handler: &mut H,
214 effect: Effect,
215 ctx: &mut PhaseContext<'_>,
216 state: &PipelineState,
217) -> GuardedEffectResult
218where
219 H: EffectHandler<'ctx>,
220{
221 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
222 handler.execute(effect, ctx)
223 })) {
224 Ok(Ok(result)) => GuardedEffectResult::Ok(Box::new(result)),
225 Ok(Err(err)) => {
226 if let Some(error_event) = extract_error_event(&err) {
227 GuardedEffectResult::Ok(Box::new(crate::reducer::effect::EffectResult::event(
228 crate::reducer::event::PipelineEvent::PromptInput(
229 crate::reducer::event::PromptInputEvent::HandlerError {
230 phase: state.phase,
231 error: error_event,
232 },
233 ),
234 )))
235 } else {
236 GuardedEffectResult::Unrecoverable(err)
237 }
238 }
239 Err(_) => GuardedEffectResult::Panic,
240 }
241}
242
243fn dump_event_loop_trace(
244 ctx: &mut PhaseContext<'_>,
245 trace: &EventTraceBuffer,
246 final_state: &PipelineState,
247 reason: &str,
248) -> bool {
249 let mut out = String::new();
250
251 for entry in trace.entries() {
252 match serde_json::to_string(entry) {
253 Ok(line) => {
254 out.push_str(&line);
255 out.push('\n');
256 }
257 Err(err) => {
258 ctx.logger.error(&format!(
259 "Failed to serialize event loop trace entry: {err}"
260 ));
261 }
262 }
263 }
264
265 let final_line = match serde_json::to_string(&EventTraceFinalState {
266 kind: "final_state",
267 reason,
268 state: final_state,
269 }) {
270 Ok(line) => line,
271 Err(err) => {
272 ctx.logger.error(&format!(
273 "Failed to serialize event loop final state: {err}"
274 ));
275 format!(
277 "{{\"kind\":\"final_state\",\"reason\":{},\"phase\":{}}}",
278 serde_json::to_string(reason).unwrap_or_else(|_| "\"unknown\"".to_string()),
279 serde_json::to_string(&format!("{:?}", final_state.phase))
280 .unwrap_or_else(|_| "\"unknown\"".to_string())
281 )
282 }
283 };
284 out.push_str(&final_line);
285 out.push('\n');
286
287 let trace_path = ctx.run_log_context.event_loop_trace();
289
290 if let Some(parent) = trace_path.parent() {
294 if let Err(err) = ctx.workspace.create_dir_all(parent) {
295 ctx.logger
296 .error(&format!("Failed to create trace directory: {err}"));
297 return false;
298 }
299 }
300
301 match ctx.workspace.write(&trace_path, &out) {
302 Ok(()) => true,
303 Err(err) => {
304 ctx.logger
305 .error(&format!("Failed to write event loop trace: {err}"));
306 false
307 }
308 }
309}
310
311fn write_completion_marker_on_error(ctx: &mut PhaseContext<'_>, err: &anyhow::Error) -> bool {
312 if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
313 ctx.logger.error(&format!(
314 "Failed to create completion marker directory: {err}"
315 ));
316 return false;
317 }
318
319 let marker_path = Path::new(".agent/tmp/completion_marker");
320 let content = format!("failure\nUnrecoverable handler error: {err}");
321 match ctx.workspace.write(marker_path, &content) {
322 Ok(()) => true,
323 Err(err) => {
324 ctx.logger.error(&format!(
325 "Failed to write completion marker for unrecoverable handler error: {err}"
326 ));
327 false
328 }
329 }
330}
331
332fn run_event_loop_with_handler_traced<'ctx, H>(
333 ctx: &mut PhaseContext<'_>,
334 initial_state: Option<PipelineState>,
335 config: EventLoopConfig,
336 handler: &mut H,
337) -> Result<EventLoopResult>
338where
339 H: EffectHandler<'ctx> + StatefulHandler,
340{
341 let mut state = initial_state.unwrap_or_else(|| create_initial_state_with_config(ctx));
342
343 handler.update_state(state.clone());
344 let mut events_processed = 0;
345 let mut trace = EventTraceBuffer::new(DEFAULT_EVENT_LOOP_TRACE_CAPACITY);
346
347 let event_loop_log_path = ctx.run_log_context.event_loop_log();
349 let mut event_loop_logger =
350 match EventLoopLogger::from_existing_log(ctx.workspace, &event_loop_log_path) {
351 Ok(logger) => logger,
352 Err(e) => {
353 ctx.logger.warn(&format!(
355 "Failed to read existing event loop log, starting fresh: {}",
356 e
357 ));
358 EventLoopLogger::new()
359 }
360 };
361
362 ctx.logger.info("Starting reducer-based event loop");
363
364 while events_processed < config.max_iterations {
365 let should_allow_checkpoint_save = matches!(state.phase, PipelinePhase::Interrupted)
377 && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
378 && state.checkpoint_saved_count == 0;
379
380 let is_awaiting_dev_fix_not_triggered =
381 matches!(state.phase, PipelinePhase::AwaitingDevFix) && !state.dev_fix_triggered;
382
383 if state.is_complete()
384 && !should_allow_checkpoint_save
385 && !is_awaiting_dev_fix_not_triggered
386 {
387 ctx.logger.info(&format!(
388 "Event loop: state already complete (phase: {:?}, checkpoint_saved_count: {})",
389 state.phase, state.checkpoint_saved_count
390 ));
391 break;
392 }
393
394 let effect = determine_next_effect(&state);
395 let effect_str = format!("{effect:?}");
396
397 let start_time = Instant::now();
400 let result = match execute_effect_guarded(handler, effect, ctx, &state) {
401 GuardedEffectResult::Ok(result) => *result,
402 GuardedEffectResult::Unrecoverable(err) => {
403 let dumped =
408 dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
409 let marker_written = write_completion_marker_on_error(ctx, &err);
410 if dumped {
411 let trace_path = ctx.run_log_context.event_loop_trace();
412 ctx.logger.error(&format!(
413 "Event loop encountered unrecoverable handler error (trace: {})",
414 trace_path.display()
415 ));
416 } else {
417 ctx.logger
418 .error("Event loop encountered unrecoverable handler error");
419 }
420 if marker_written {
421 ctx.logger
422 .info("Completion marker written for unrecoverable handler error");
423 }
424
425 let failure_event = PipelineEvent::PromptInput(
429 crate::reducer::event::PromptInputEvent::HandlerError {
430 phase: state.phase,
431 error: crate::reducer::event::ErrorEvent::WorkspaceWriteFailed {
432 path: "(unrecoverable_handler_error)".to_string(),
433 kind: crate::reducer::event::WorkspaceIoErrorKind::Other,
434 },
435 },
436 );
437
438 let event_str = format!("{:?}", failure_event);
439 let duration_ms = start_time.elapsed().as_millis() as u64;
440 let new_state = reduce(state, failure_event);
441
442 let context_pairs: Vec<(&str, String)> = vec![
444 ("iteration", new_state.iteration.to_string()),
445 ("reviewer_pass", new_state.reviewer_pass.to_string()),
446 ("error_kind", "unrecoverable_failure".to_string()),
447 ("effect", effect_str.clone()),
448 ];
449 let context_refs: Vec<(&str, &str)> = context_pairs
450 .iter()
451 .map(|(k, v)| (*k, v.as_str()))
452 .collect();
453 if let Err(e) = event_loop_logger.log_effect(crate::logging::LogEffectParams {
454 workspace: ctx.workspace,
455 log_path: &ctx.run_log_context.event_loop_log(),
456 phase: new_state.phase,
457 effect: &effect_str,
458 primary_event: &event_str,
459 extra_events: &[],
460 duration_ms,
461 context: &context_refs,
462 }) {
463 ctx.logger
464 .warn(&format!("Failed to write to event loop log: {}", e));
465 }
466
467 trace.push(build_trace_entry(
468 events_processed,
469 &new_state,
470 &effect_str,
471 &event_str,
472 ));
473 handler.update_state(new_state.clone());
474 state = new_state;
475 events_processed += 1;
476
477 continue;
478 }
479 GuardedEffectResult::Panic => {
480 let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
484 if dumped {
485 let trace_path = ctx.run_log_context.event_loop_trace();
486 ctx.logger.error(&format!(
487 "Event loop recovered from panic (trace: {})",
488 trace_path.display()
489 ));
490 } else {
491 ctx.logger.error("Event loop recovered from panic");
492 }
493
494 if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
496 ctx.logger.error(&format!(
497 "Failed to create completion marker directory: {err}"
498 ));
499 }
500 let marker_path = Path::new(".agent/tmp/completion_marker");
501 let content = format!(
502 "failure\nHandler panic in effect execution (phase={:?}, events_processed={})",
503 state.phase, events_processed
504 );
505 if let Err(err) = ctx.workspace.write(marker_path, &content) {
506 ctx.logger.error(&format!(
507 "Failed to write completion marker for handler panic: {err}"
508 ));
509 }
510
511 let failure_event = PipelineEvent::PromptInput(
512 crate::reducer::event::PromptInputEvent::HandlerError {
513 phase: state.phase,
514 error: crate::reducer::event::ErrorEvent::WorkspaceWriteFailed {
515 path: "(handler_panic)".to_string(),
516 kind: crate::reducer::event::WorkspaceIoErrorKind::Other,
517 },
518 },
519 );
520
521 let event_str = format!("{:?}", failure_event);
522 let duration_ms = start_time.elapsed().as_millis() as u64;
523 let new_state = reduce(state, failure_event);
524
525 let context_pairs: Vec<(&str, String)> = vec![
527 ("iteration", new_state.iteration.to_string()),
528 ("reviewer_pass", new_state.reviewer_pass.to_string()),
529 ("error_kind", "handler_panic".to_string()),
530 ("effect", effect_str.clone()),
531 ];
532 let context_refs: Vec<(&str, &str)> = context_pairs
533 .iter()
534 .map(|(k, v)| (*k, v.as_str()))
535 .collect();
536 if let Err(e) = event_loop_logger.log_effect(crate::logging::LogEffectParams {
537 workspace: ctx.workspace,
538 log_path: &ctx.run_log_context.event_loop_log(),
539 phase: new_state.phase,
540 effect: &effect_str,
541 primary_event: &event_str,
542 extra_events: &[],
543 duration_ms,
544 context: &context_refs,
545 }) {
546 ctx.logger
547 .warn(&format!("Failed to write to event loop log: {}", e));
548 }
549
550 trace.push(build_trace_entry(
551 events_processed,
552 &new_state,
553 &effect_str,
554 &event_str,
555 ));
556 handler.update_state(new_state.clone());
557 state = new_state;
558 events_processed += 1;
559
560 continue;
561 }
562 };
563
564 for ui_event in &result.ui_events {
566 ctx.logger
567 .info(&crate::rendering::render_ui_event(ui_event));
568 }
569
570 let event_str = format!("{:?}", result.event);
571 let duration_ms = start_time.elapsed().as_millis() as u64;
572
573 let new_state = reduce(state, result.event.clone());
575
576 let extra_events: Vec<String> = result
578 .additional_events
579 .iter()
580 .map(|e| format!("{:?}", e))
581 .collect();
582 let context_pairs: Vec<(&str, String)> = vec![
583 ("iteration", new_state.iteration.to_string()),
584 ("reviewer_pass", new_state.reviewer_pass.to_string()),
585 ];
586 let context_refs: Vec<(&str, &str)> = context_pairs
587 .iter()
588 .map(|(k, v)| (*k, v.as_str()))
589 .collect();
590 if let Err(e) = event_loop_logger.log_effect(crate::logging::LogEffectParams {
591 workspace: ctx.workspace,
592 log_path: &ctx.run_log_context.event_loop_log(),
593 phase: new_state.phase,
594 effect: &effect_str,
595 primary_event: &event_str,
596 extra_events: &extra_events,
597 duration_ms,
598 context: &context_refs,
599 }) {
600 ctx.logger
601 .warn(&format!("Failed to write to event loop log: {}", e));
602 }
603
604 trace.push(build_trace_entry(
605 events_processed,
606 &new_state,
607 &effect_str,
608 &event_str,
609 ));
610 handler.update_state(new_state.clone());
611 state = new_state;
612 events_processed += 1;
613
614 for event in result.additional_events {
616 let event_str = format!("{:?}", event);
617 let additional_state = reduce(state, event);
618 trace.push(build_trace_entry(
619 events_processed,
620 &additional_state,
621 &effect_str,
622 &event_str,
623 ));
624 handler.update_state(additional_state.clone());
625 state = additional_state;
626 events_processed += 1;
627 }
628
629 let current_fingerprint = crate::reducer::compute_effect_fingerprint(&state);
633 state = PipelineState {
634 continuation: state
635 .continuation
636 .update_loop_detection_counters(current_fingerprint),
637 ..state
638 };
639 handler.update_state(state.clone());
640
641 let should_allow_checkpoint_save = matches!(state.phase, PipelinePhase::Interrupted)
654 && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
655 && state.checkpoint_saved_count == 0;
656
657 let is_awaiting_dev_fix_not_triggered =
658 matches!(state.phase, PipelinePhase::AwaitingDevFix) && !state.dev_fix_triggered;
659
660 if state.is_complete()
661 && !should_allow_checkpoint_save
662 && !is_awaiting_dev_fix_not_triggered
663 {
664 ctx.logger.info(&format!(
665 "Event loop: state became complete (phase: {:?}, checkpoint_saved_count: {})",
666 state.phase, state.checkpoint_saved_count
667 ));
668
669 if matches!(state.phase, PipelinePhase::Interrupted)
672 && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
673 && state.checkpoint_saved_count == 0
674 {
675 ctx.logger.warn(
676 "Interrupted phase reached from AwaitingDevFix without checkpoint saved. \
677 SaveCheckpoint effect should execute on next iteration.",
678 );
679 }
680
681 break;
682 }
683 }
684
685 let mut forced_completion = false;
687
688 let should_force_checkpoint_after_completion =
689 matches!(state.phase, PipelinePhase::Interrupted)
690 && matches!(state.previous_phase, Some(PipelinePhase::AwaitingDevFix))
691 && state.checkpoint_saved_count == 0;
692
693 if events_processed >= config.max_iterations && should_force_checkpoint_after_completion {
694 ctx.logger.warn(
695 "Max iterations reached after completion marker; forcing SaveCheckpoint execution",
696 );
697
698 let save_effect = Effect::SaveCheckpoint {
699 trigger: CheckpointTrigger::Interrupt,
700 };
701 let save_effect_str = format!("{save_effect:?}");
702 match execute_effect_guarded(handler, save_effect, ctx, &state) {
703 GuardedEffectResult::Ok(result) => {
704 let result = *result;
705 let event_str = format!("{:?}", result.event);
706 state = reduce(state, result.event.clone());
707 trace.push(build_trace_entry(
708 events_processed,
709 &state,
710 &save_effect_str,
711 &event_str,
712 ));
713 handler.update_state(state.clone());
714 events_processed += 1;
715
716 for event in result.additional_events {
717 let event_str = format!("{:?}", event);
718 state = reduce(state, event);
719 trace.push(build_trace_entry(
720 events_processed,
721 &state,
722 &save_effect_str,
723 &event_str,
724 ));
725 handler.update_state(state.clone());
726 events_processed += 1;
727 }
728 }
729 GuardedEffectResult::Unrecoverable(err) => {
730 let dumped =
733 dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
734 let marker_written = write_completion_marker_on_error(ctx, &err);
735 if dumped {
736 let trace_path = ctx.run_log_context.event_loop_trace();
737 ctx.logger.error(&format!(
738 "Event loop encountered unrecoverable handler error (trace: {})",
739 trace_path.display()
740 ));
741 } else {
742 ctx.logger
743 .error("Event loop encountered unrecoverable handler error");
744 }
745 if marker_written {
746 ctx.logger
747 .info("Completion marker written for unrecoverable handler error");
748 }
749
750 return Ok(EventLoopResult {
754 completed: true,
755 events_processed,
756 final_phase: state.phase,
757 final_state: state.clone(),
758 });
759 }
760 GuardedEffectResult::Panic => {
761 let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
763 if dumped {
764 let trace_path = ctx.run_log_context.event_loop_trace();
765 ctx.logger.error(&format!(
766 "Event loop recovered from panic (trace: {})",
767 trace_path.display()
768 ));
769 } else {
770 ctx.logger.error("Event loop recovered from panic");
771 }
772
773 if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
774 ctx.logger.error(&format!(
775 "Failed to create completion marker directory: {err}"
776 ));
777 }
778 let marker_path = Path::new(".agent/tmp/completion_marker");
779 let content = format!(
780 "failure\nHandler panic during forced completion (phase={:?}, events_processed={})",
781 state.phase, events_processed
782 );
783 if let Err(err) = ctx.workspace.write(marker_path, &content) {
784 ctx.logger.error(&format!(
785 "Failed to write completion marker for handler panic: {err}"
786 ));
787 }
788
789 return Ok(EventLoopResult {
793 completed: true,
794 events_processed,
795 final_phase: state.phase,
796 final_state: state.clone(),
797 });
798 }
799 }
800 }
801
802 if events_processed >= config.max_iterations && !state.is_complete() {
803 let dumped = dump_event_loop_trace(ctx, &trace, &state, "max_iterations");
804
805 if matches!(state.phase, PipelinePhase::AwaitingDevFix) {
815 ctx.logger.error(
816 "BUG: Hit max iterations in AwaitingDevFix phase. \
817 TriggerDevFixFlow should have executed before reaching this point. \
818 Applying defensive recovery logic.",
819 );
820 ctx.logger
821 .warn("Max iterations reached in AwaitingDevFix - forcing completion marker");
822
823 if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
825 ctx.logger.error(&format!(
826 "Failed to create completion marker directory: {err}"
827 ));
828 }
829 let marker_path = Path::new(".agent/tmp/completion_marker");
830 let content = format!(
831 "failure\nMax iterations reached in AwaitingDevFix phase (events_processed={})",
832 events_processed
833 );
834 match ctx.workspace.write(marker_path, &content) {
835 Ok(()) => {
836 ctx.logger
837 .info("Completion marker written for max iterations failure");
838 }
839 Err(err) => {
840 ctx.logger.error(&format!(
841 "Failed to write completion marker for max iterations failure: {err}"
842 ));
843 }
844 }
845
846 let completion_event =
847 PipelineEvent::AwaitingDevFix(AwaitingDevFixEvent::CompletionMarkerEmitted {
848 is_failure: true,
849 });
850 let completion_event_str = format!("{:?}", completion_event);
851 state = reduce(state, completion_event);
852 trace.push(build_trace_entry(
853 events_processed,
854 &state,
855 "ForcedCompletionMarker",
856 &completion_event_str,
857 ));
858 handler.update_state(state.clone());
859 events_processed += 1;
860
861 let save_effect = Effect::SaveCheckpoint {
862 trigger: CheckpointTrigger::Interrupt,
863 };
864 let save_effect_str = format!("{save_effect:?}");
865 match execute_effect_guarded(handler, save_effect, ctx, &state) {
866 GuardedEffectResult::Ok(result) => {
867 let result = *result;
868 let event_str = format!("{:?}", result.event);
869 state = reduce(state, result.event.clone());
870 trace.push(build_trace_entry(
871 events_processed,
872 &state,
873 &save_effect_str,
874 &event_str,
875 ));
876 handler.update_state(state.clone());
877 events_processed += 1;
878
879 for event in result.additional_events {
880 let event_str = format!("{:?}", event);
881 state = reduce(state, event);
882 trace.push(build_trace_entry(
883 events_processed,
884 &state,
885 &save_effect_str,
886 &event_str,
887 ));
888 handler.update_state(state.clone());
889 events_processed += 1;
890 }
891 }
892 GuardedEffectResult::Unrecoverable(err) => {
893 let dumped =
896 dump_event_loop_trace(ctx, &trace, &state, "unrecoverable_handler_error");
897 let marker_written = write_completion_marker_on_error(ctx, &err);
898 if dumped {
899 let trace_path = ctx.run_log_context.event_loop_trace();
900 ctx.logger.error(&format!(
901 "Event loop encountered unrecoverable handler error (trace: {})",
902 trace_path.display()
903 ));
904 } else {
905 ctx.logger
906 .error("Event loop encountered unrecoverable handler error");
907 }
908 if marker_written {
909 ctx.logger
910 .info("Completion marker written for unrecoverable handler error");
911 }
912
913 return Ok(EventLoopResult {
918 completed: false,
919 events_processed,
920 final_phase: state.phase,
921 final_state: state.clone(),
922 });
923 }
924 GuardedEffectResult::Panic => {
925 let dumped = dump_event_loop_trace(ctx, &trace, &state, "panic");
927 if dumped {
928 let trace_path = ctx.run_log_context.event_loop_trace();
929 ctx.logger.error(&format!(
930 "Event loop recovered from panic (trace: {})",
931 trace_path.display()
932 ));
933 } else {
934 ctx.logger.error("Event loop recovered from panic");
935 }
936
937 if let Err(err) = ctx.workspace.create_dir_all(Path::new(".agent/tmp")) {
938 ctx.logger.error(&format!(
939 "Failed to create completion marker directory: {err}"
940 ));
941 }
942 let marker_path = Path::new(".agent/tmp/completion_marker");
943 let content = format!(
944 "failure\nHandler panic during forced completion (phase={:?}, events_processed={})",
945 state.phase, events_processed
946 );
947 if let Err(err) = ctx.workspace.write(marker_path, &content) {
948 ctx.logger.error(&format!(
949 "Failed to write completion marker for handler panic: {err}"
950 ));
951 }
952
953 return Ok(EventLoopResult {
958 completed: false,
959 events_processed,
960 final_phase: state.phase,
961 final_state: state.clone(),
962 });
963 }
964 }
965
966 forced_completion = true;
967
968 ctx.logger
969 .info("Forced transition to Interrupted phase to satisfy termination requirements");
970 }
971
972 if dumped {
973 let trace_path = ctx.run_log_context.event_loop_trace();
974 ctx.logger.warn(&format!(
975 "Event loop reached max iterations ({}) without completion (trace: {})",
976 config.max_iterations,
977 trace_path.display()
978 ));
979 } else {
980 ctx.logger.warn(&format!(
981 "Event loop reached max iterations ({}) without completion",
982 config.max_iterations
983 ));
984 }
985
986 if !forced_completion {
987 ctx.logger.error(&format!(
988 "Event loop exiting: reason=max_iterations, phase={:?}, checkpoint_saved_count={}, events_processed={}",
989 state.phase, state.checkpoint_saved_count, events_processed
990 ));
991 }
992 }
993
994 let completed = state.is_complete();
995 if !completed {
996 ctx.logger.warn(&format!(
997 "Event loop exiting without completion: phase={:?}, checkpoint_saved_count={}, \
998 previous_phase={:?}, events_processed={}",
999 state.phase, state.checkpoint_saved_count, state.previous_phase, events_processed
1000 ));
1001 ctx.logger.info(&format!(
1002 "Final state: agent_chain.retry_cycle={}, agent_chain.current_role={:?}",
1003 state.agent_chain.retry_cycle, state.agent_chain.current_role
1004 ));
1005 }
1006
1007 Ok(EventLoopResult {
1008 completed,
1009 events_processed,
1010 final_phase: state.phase,
1011 final_state: state.clone(),
1012 })
1013}
1014
1015pub fn run_event_loop(
1036 ctx: &mut PhaseContext<'_>,
1037 initial_state: Option<PipelineState>,
1038 config: EventLoopConfig,
1039) -> Result<EventLoopResult> {
1040 let state = initial_state.unwrap_or_else(|| create_initial_state_with_config(ctx));
1041 let mut handler = MainEffectHandler::new(state.clone());
1042 run_event_loop_with_handler_traced(ctx, Some(state), config, &mut handler)
1043}
1044
1045pub fn run_event_loop_with_handler<'ctx, H>(
1061 ctx: &mut PhaseContext<'_>,
1062 initial_state: Option<PipelineState>,
1063 config: EventLoopConfig,
1064 handler: &mut H,
1065) -> Result<EventLoopResult>
1066where
1067 H: EffectHandler<'ctx> + StatefulHandler,
1068{
1069 run_event_loop_with_handler_traced(ctx, initial_state, config, handler)
1070}
1071
1072pub trait StatefulHandler {
1077 fn update_state(&mut self, state: PipelineState);
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 use super::*;
1084
1085 include!("event_loop/tests_trace_dump.rs");
1086 include!("event_loop/tests_checkpoint.rs");
1087 include!("event_loop/tests_review_flow.rs");
1088}