Skip to main content

ralph_core/event_loop/
mod.rs

1//! Event loop orchestration.
2//!
3//! The event loop coordinates the execution of hats via pub/sub messaging.
4
5mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use crate::text::floor_char_boundary;
21use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
22use serde_json::{Map, Value};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use std::time::Duration;
27use tracing::{debug, info, warn};
28
29/// Result of processing events from JSONL.
30#[derive(Debug, Clone)]
31pub struct ProcessedEvents {
32    /// Whether any valid events were found and published.
33    pub had_events: bool,
34    /// Whether any published events matched the semantic `plan.*` topic family.
35    pub had_plan_events: bool,
36    /// Structured context for the first processed `human.interact` event,
37    /// including the question payload and post-dispatch outcome metadata.
38    pub human_interact_context: Option<Value>,
39    /// Whether any events lacked specific hat subscribers (orphans handled by Ralph).
40    pub has_orphans: bool,
41}
42
43/// Result of processing events from JSONL with wave events partitioned out.
44#[derive(Debug)]
45pub struct ProcessedEventsWithWaves {
46    /// Normal event processing results.
47    pub processed: ProcessedEvents,
48    /// Wave events extracted before normal processing (have wave_id set).
49    pub wave_events: Vec<crate::event_reader::Event>,
50}
51
52/// Reason the event loop terminated.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum TerminationReason {
55    /// Completion promise was detected in output.
56    CompletionPromise,
57    /// Maximum iterations reached.
58    MaxIterations,
59    /// Maximum runtime exceeded.
60    MaxRuntime,
61    /// Maximum cost exceeded.
62    MaxCost,
63    /// Too many consecutive failures.
64    ConsecutiveFailures,
65    /// Loop thrashing detected (repeated blocked events).
66    LoopThrashing,
67    /// Stale loop detected (same topic emitted 3+ times consecutively).
68    LoopStale,
69    /// Too many consecutive malformed JSONL lines in events file.
70    ValidationFailure,
71    /// Manually stopped.
72    Stopped,
73    /// Interrupted by signal (SIGINT/SIGTERM).
74    Interrupted,
75    /// Restart requested via Telegram `/restart` command.
76    RestartRequested,
77    /// Workspace directory (worktree) was removed externally.
78    WorkspaceGone,
79    /// Loop was cancelled gracefully via loop.cancel event (human rejection, timeout).
80    Cancelled,
81}
82
83impl TerminationReason {
84    /// Returns the exit code for this termination reason per spec.
85    ///
86    /// Per spec "Loop Termination" section:
87    /// - 0: Completion promise detected (success)
88    /// - 1: Consecutive failures or unrecoverable error (failure)
89    /// - 2: Max iterations, max runtime, or max cost exceeded (limit)
90    /// - 130: User interrupt (SIGINT = 128 + 2)
91    pub fn exit_code(&self) -> i32 {
92        match self {
93            TerminationReason::CompletionPromise => 0,
94            TerminationReason::ConsecutiveFailures
95            | TerminationReason::LoopThrashing
96            | TerminationReason::LoopStale
97            | TerminationReason::ValidationFailure
98            | TerminationReason::Stopped
99            | TerminationReason::WorkspaceGone => 1,
100            TerminationReason::MaxIterations
101            | TerminationReason::MaxRuntime
102            | TerminationReason::MaxCost => 2,
103            TerminationReason::Interrupted => 130,
104            // Restart uses exit code 3 to signal the caller to exec-replace
105            TerminationReason::RestartRequested => 3,
106            // Cancelled is a clean exit (0) — the loop stopped intentionally
107            TerminationReason::Cancelled => 0,
108        }
109    }
110
111    /// Returns the reason string for use in loop.terminate event payload.
112    ///
113    /// Per spec event payload format:
114    /// `completed | max_iterations | max_runtime | consecutive_failures | interrupted | error`
115    pub fn as_str(&self) -> &'static str {
116        match self {
117            TerminationReason::CompletionPromise => "completed",
118            TerminationReason::MaxIterations => "max_iterations",
119            TerminationReason::MaxRuntime => "max_runtime",
120            TerminationReason::MaxCost => "max_cost",
121            TerminationReason::ConsecutiveFailures => "consecutive_failures",
122            TerminationReason::LoopThrashing => "loop_thrashing",
123            TerminationReason::LoopStale => "loop_stale",
124            TerminationReason::ValidationFailure => "validation_failure",
125            TerminationReason::Stopped => "stopped",
126            TerminationReason::Interrupted => "interrupted",
127            TerminationReason::RestartRequested => "restart_requested",
128            TerminationReason::WorkspaceGone => "workspace_gone",
129            TerminationReason::Cancelled => "cancelled",
130        }
131    }
132
133    /// Returns true if this is a successful completion (not an error or limit).
134    pub fn is_success(&self) -> bool {
135        matches!(self, TerminationReason::CompletionPromise)
136    }
137}
138
139/// The main event loop orchestrator.
140pub struct EventLoop {
141    config: RalphConfig,
142    registry: HatRegistry,
143    bus: EventBus,
144    state: LoopState,
145    instruction_builder: InstructionBuilder,
146    ralph: HatlessRalph,
147    /// Cached human guidance messages that should persist across iterations.
148    robot_guidance: Vec<String>,
149    /// Event reader for consuming events from JSONL file.
150    /// Made pub(crate) to allow tests to override the path.
151    pub(crate) event_reader: EventReader,
152    diagnostics: crate::diagnostics::DiagnosticsCollector,
153    /// Loop context for path resolution (None for legacy single-loop mode).
154    loop_context: Option<LoopContext>,
155    /// Skill registry for the current loop.
156    skill_registry: SkillRegistry,
157    /// Robot service for human-in-the-loop communication.
158    /// Injected externally when `human.enabled` is true and this is the primary loop.
159    robot_service: Option<Box<dyn RobotService>>,
160}
161
162impl EventLoop {
163    /// Creates a new event loop from configuration.
164    pub fn new(config: RalphConfig) -> Self {
165        // Try to create diagnostics collector, but fall back to disabled if it fails
166        // (e.g., in tests without proper directory setup)
167        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
168            .unwrap_or_else(|e| {
169                debug!(
170                    "Failed to initialize diagnostics: {}, using disabled collector",
171                    e
172                );
173                crate::diagnostics::DiagnosticsCollector::disabled()
174            });
175
176        Self::with_diagnostics(config, diagnostics)
177    }
178
179    /// Creates a new event loop with a loop context for path resolution.
180    ///
181    /// The loop context determines where events, tasks, and other state files
182    /// are located. Use this for multi-loop scenarios where each loop runs
183    /// in an isolated workspace (git worktree).
184    pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
185        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
186            .unwrap_or_else(|e| {
187                debug!(
188                    "Failed to initialize diagnostics: {}, using disabled collector",
189                    e
190                );
191                crate::diagnostics::DiagnosticsCollector::disabled()
192            });
193
194        Self::with_context_and_diagnostics(config, context, diagnostics)
195    }
196
197    /// Creates a new event loop with explicit loop context and diagnostics.
198    pub fn with_context_and_diagnostics(
199        config: RalphConfig,
200        context: LoopContext,
201        diagnostics: crate::diagnostics::DiagnosticsCollector,
202    ) -> Self {
203        let registry = HatRegistry::from_config(&config);
204        let instruction_builder =
205            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
206
207        let mut bus = EventBus::new();
208
209        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
210        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
211        // Custom hats are registered first (higher priority), Ralph catches everything else.
212        for hat in registry.all() {
213            bus.register(hat.clone());
214        }
215
216        // Always register Ralph as catch-all coordinator
217        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
218        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
219        bus.register(ralph_hat);
220
221        if registry.is_empty() {
222            debug!("Solo mode: Ralph is the only coordinator");
223        } else {
224            debug!(
225                "Multi-hat mode: {} custom hats + Ralph as fallback",
226                registry.len()
227            );
228        }
229
230        // Build skill registry from config
231        let mut skill_registry = if config.skills.enabled {
232            SkillRegistry::from_config(
233                &config.skills,
234                context.workspace(),
235                Some(config.cli.backend.as_str()),
236            )
237            .unwrap_or_else(|e| {
238                warn!(
239                    "Failed to build skill registry: {}, using empty registry",
240                    e
241                );
242                SkillRegistry::new(Some(config.cli.backend.as_str()))
243            })
244        } else {
245            SkillRegistry::new(Some(config.cli.backend.as_str()))
246        };
247
248        // Remove task/memory skills from the index when their config is disabled
249        if !config.tasks.enabled {
250            skill_registry.remove("ralph-tools-tasks");
251        }
252        if !config.memories.enabled {
253            skill_registry.remove("ralph-tools-memories");
254        }
255
256        let skill_index = if config.skills.enabled {
257            skill_registry.build_index(None)
258        } else {
259            String::new()
260        };
261
262        // When memories are enabled, add tasks CLI instructions alongside scratchpad
263        let ralph = HatlessRalph::new(
264            config.event_loop.completion_promise.clone(),
265            config.core.clone(),
266            &registry,
267            config.event_loop.starting_event.clone(),
268        )
269        .with_memories_enabled(config.memories.enabled)
270        .with_skill_index(skill_index);
271
272        // Read timestamped events path from marker file, fall back to default
273        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
274        // which we resolve relative to the workspace root
275        let events_path = std::fs::read_to_string(context.current_events_marker())
276            .map(|s| {
277                let relative = s.trim();
278                context.workspace().join(relative)
279            })
280            .unwrap_or_else(|_| context.events_path());
281        let event_reader = EventReader::new(&events_path);
282
283        Self {
284            config,
285            registry,
286            bus,
287            state: LoopState::new(),
288            instruction_builder,
289            ralph,
290            robot_guidance: Vec::new(),
291            event_reader,
292            diagnostics,
293            loop_context: Some(context),
294            skill_registry,
295            robot_service: None,
296        }
297    }
298
299    /// Creates a new event loop with explicit diagnostics collector (for testing).
300    pub fn with_diagnostics(
301        config: RalphConfig,
302        diagnostics: crate::diagnostics::DiagnosticsCollector,
303    ) -> Self {
304        let registry = HatRegistry::from_config(&config);
305        let instruction_builder =
306            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
307
308        let mut bus = EventBus::new();
309
310        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
311        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
312        // Custom hats are registered first (higher priority), Ralph catches everything else.
313        for hat in registry.all() {
314            bus.register(hat.clone());
315        }
316
317        // Always register Ralph as catch-all coordinator
318        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
319        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
320        bus.register(ralph_hat);
321
322        if registry.is_empty() {
323            debug!("Solo mode: Ralph is the only coordinator");
324        } else {
325            debug!(
326                "Multi-hat mode: {} custom hats + Ralph as fallback",
327                registry.len()
328            );
329        }
330
331        // Build skill registry from config
332        let workspace_root = std::path::Path::new(".");
333        let mut skill_registry = if config.skills.enabled {
334            SkillRegistry::from_config(
335                &config.skills,
336                workspace_root,
337                Some(config.cli.backend.as_str()),
338            )
339            .unwrap_or_else(|e| {
340                warn!(
341                    "Failed to build skill registry: {}, using empty registry",
342                    e
343                );
344                SkillRegistry::new(Some(config.cli.backend.as_str()))
345            })
346        } else {
347            SkillRegistry::new(Some(config.cli.backend.as_str()))
348        };
349
350        // Remove task/memory skills from the index when their config is disabled
351        if !config.tasks.enabled {
352            skill_registry.remove("ralph-tools-tasks");
353        }
354        if !config.memories.enabled {
355            skill_registry.remove("ralph-tools-memories");
356        }
357
358        let skill_index = if config.skills.enabled {
359            skill_registry.build_index(None)
360        } else {
361            String::new()
362        };
363
364        // When memories are enabled, add tasks CLI instructions alongside scratchpad
365        let ralph = HatlessRalph::new(
366            config.event_loop.completion_promise.clone(),
367            config.core.clone(),
368            &registry,
369            config.event_loop.starting_event.clone(),
370        )
371        .with_memories_enabled(config.memories.enabled)
372        .with_skill_index(skill_index);
373
374        // Read events path from marker file, fall back to default if not present
375        // The marker file is written by run_loop_impl() at run startup
376        let events_path = std::fs::read_to_string(".ralph/current-events")
377            .map(|s| s.trim().to_string())
378            .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
379        let event_reader = EventReader::new(&events_path);
380
381        Self {
382            config,
383            registry,
384            bus,
385            state: LoopState::new(),
386            instruction_builder,
387            ralph,
388            robot_guidance: Vec::new(),
389            event_reader,
390            diagnostics,
391            loop_context: None,
392            skill_registry,
393            robot_service: None,
394        }
395    }
396
397    /// Injects a robot service for human-in-the-loop communication.
398    ///
399    /// Call this after construction to enable `human.interact` event handling,
400    /// periodic check-ins, and question/response flow. The service is typically
401    /// created by the CLI layer (e.g., `TelegramService`) and injected here,
402    /// keeping the core event loop decoupled from any specific communication
403    /// platform.
404    pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
405        self.robot_service = Some(service);
406    }
407
408    /// Returns the loop context, if one was provided.
409    pub fn loop_context(&self) -> Option<&LoopContext> {
410        self.loop_context.as_ref()
411    }
412
413    /// Returns the tasks path based on loop context or default.
414    fn tasks_path(&self) -> PathBuf {
415        self.loop_context
416            .as_ref()
417            .map(|ctx| ctx.tasks_path())
418            .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
419    }
420
421    /// Returns the scratchpad path based on config and loop context.
422    ///
423    /// If the config specifies a custom (non-default) scratchpad path, that
424    /// always wins. Otherwise, the loop context path is used for worktree
425    /// isolation, falling back to the config default.
426    fn scratchpad_path(&self) -> PathBuf {
427        if self.config.core.scratchpad != ".ralph/agent/scratchpad.md" {
428            return PathBuf::from(&self.config.core.scratchpad);
429        }
430        self.loop_context
431            .as_ref()
432            .map(|ctx| ctx.scratchpad_path())
433            .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
434    }
435
436    /// Returns the current loop state.
437    pub fn state(&self) -> &LoopState {
438        &self.state
439    }
440
441    /// Resets the stale-loop topic counter.
442    ///
443    /// Call after processing wave results — multiple events with the same topic
444    /// (e.g. `review.done` from parallel workers) are expected and should not
445    /// trigger the stale loop detector.
446    pub fn reset_stale_topic_counter(&mut self) {
447        self.state.consecutive_same_signature = 0;
448        self.state.last_emitted_signature = None;
449    }
450
451    /// Returns the configuration.
452    pub fn config(&self) -> &RalphConfig {
453        &self.config
454    }
455
456    /// Returns the hat registry.
457    pub fn registry(&self) -> &HatRegistry {
458        &self.registry
459    }
460
461    /// Records hook telemetry for diagnostics.
462    pub fn log_hook_run_telemetry(&self, entry: crate::diagnostics::HookRunTelemetryEntry) {
463        self.diagnostics.log_hook_run(entry);
464    }
465
466    /// Logs the full prompt for an iteration to the diagnostics session.
467    pub fn log_prompt(&self, iteration: u32, hat: &str, prompt: &str) {
468        self.diagnostics.log_prompt(iteration, hat, prompt);
469    }
470
471    /// Gets the backend configuration for a hat.
472    ///
473    /// If the hat has a backend configured, returns that.
474    /// Otherwise, returns None (caller should use global backend).
475    pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
476        self.registry
477            .get_config(hat_id)
478            .and_then(|config| config.backend.as_ref())
479    }
480
481    /// Adds an observer that receives all published events.
482    ///
483    /// Multiple observers can be added (e.g., session recorder + TUI).
484    /// Each observer is called before events are routed to subscribers.
485    pub fn add_observer<F>(&mut self, observer: F)
486    where
487        F: Fn(&Event) + Send + 'static,
488    {
489        self.bus.add_observer(observer);
490    }
491
492    /// Sets a single observer, clearing any existing observers.
493    ///
494    /// Prefer `add_observer` when multiple observers are needed.
495    #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
496    pub fn set_observer<F>(&mut self, observer: F)
497    where
498        F: Fn(&Event) + Send + 'static,
499    {
500        #[allow(deprecated)]
501        self.bus.set_observer(observer);
502    }
503
504    /// Checks if any termination condition is met.
505    pub fn check_termination(&self) -> Option<TerminationReason> {
506        let cfg = &self.config.event_loop;
507
508        if self.state.iteration >= cfg.max_iterations {
509            return Some(TerminationReason::MaxIterations);
510        }
511
512        if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
513            return Some(TerminationReason::MaxRuntime);
514        }
515
516        if let Some(max_cost) = cfg.max_cost_usd
517            && self.state.cumulative_cost >= max_cost
518        {
519            return Some(TerminationReason::MaxCost);
520        }
521
522        if self.state.consecutive_failures >= cfg.max_consecutive_failures {
523            return Some(TerminationReason::ConsecutiveFailures);
524        }
525
526        // Check for loop thrashing: planner keeps dispatching abandoned tasks
527        if self.state.abandoned_task_redispatches >= 3 {
528            return Some(TerminationReason::LoopThrashing);
529        }
530
531        // Check for validation failures: too many consecutive malformed JSONL lines
532        if self.state.consecutive_malformed_events >= 3 {
533            return Some(TerminationReason::ValidationFailure);
534        }
535
536        // Check for stale loop: same event signature emitted 3+ times in a row
537        if self.state.consecutive_same_signature >= 3 {
538            let topic = self
539                .state
540                .last_emitted_signature
541                .as_ref()
542                .map(|signature| signature.topic.as_str())
543                .unwrap_or("?");
544            warn!(
545                topic,
546                count = self.state.consecutive_same_signature,
547                "Stale loop detected: same event signature emitted consecutively"
548            );
549            return Some(TerminationReason::LoopStale);
550        }
551
552        // Check for stop signal from Telegram /stop or CLI stop-requested
553        let stop_path =
554            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
555        if stop_path.exists() {
556            let _ = std::fs::remove_file(&stop_path);
557            return Some(TerminationReason::Stopped);
558        }
559
560        // Check for restart signal from Telegram /restart command
561        let restart_path =
562            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
563        if restart_path.exists() {
564            return Some(TerminationReason::RestartRequested);
565        }
566
567        // Check if workspace directory has been removed (zombie worktree detection)
568        if !std::path::Path::new(&self.config.core.workspace_root).is_dir() {
569            return Some(TerminationReason::WorkspaceGone);
570        }
571
572        None
573    }
574
575    /// Check if a loop.cancel event was detected.
576    ///
577    /// Unlike check_completion_event(), this does NOT validate required_events.
578    /// Cancellation is an explicit abort — it doesn't need the workflow to be complete.
579    pub fn check_cancellation_event(&mut self) -> Option<TerminationReason> {
580        if !self.state.cancellation_requested {
581            return None;
582        }
583        self.state.cancellation_requested = false;
584        info!("Loop cancelled gracefully via loop.cancel event");
585
586        self.diagnostics.log_orchestration(
587            self.state.iteration,
588            "loop",
589            crate::diagnostics::OrchestrationEvent::LoopTerminated {
590                reason: "cancelled".to_string(),
591            },
592        );
593
594        Some(TerminationReason::Cancelled)
595    }
596
597    /// Checks if a completion event was received and returns termination reason.
598    ///
599    /// Completion is only accepted via JSONL events (e.g., `ralph emit`).
600    pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
601        if !self.state.completion_requested {
602            return None;
603        }
604
605        // Event chain validation: check required events were seen
606        let required = &self.config.event_loop.required_events;
607        if !required.is_empty() {
608            let missing = self.state.missing_required_events(required);
609            if !missing.is_empty() {
610                warn!(
611                    missing = ?missing,
612                    "Rejecting LOOP_COMPLETE: required events not seen during loop lifetime"
613                );
614                self.state.completion_requested = false;
615
616                // Inject task.resume so the loop continues
617                let resume_payload = format!(
618                    "LOOP_COMPLETE rejected: missing required events: {:?}. \
619                     The agent must complete all workflow phases before emitting LOOP_COMPLETE. \
620                     Use loop.cancel to abort the workflow instead.",
621                    missing
622                );
623                self.bus.publish(Event::new("task.resume", resume_payload));
624                return None;
625            }
626        }
627
628        self.state.completion_requested = false;
629
630        // In persistent mode, suppress completion and keep the loop alive
631        if self.config.event_loop.persistent {
632            info!("Completion event suppressed - persistent mode active, loop staying alive");
633
634            self.diagnostics.log_orchestration(
635                self.state.iteration,
636                "loop",
637                crate::diagnostics::OrchestrationEvent::LoopTerminated {
638                    reason: "completion_event_suppressed_persistent".to_string(),
639                },
640            );
641
642            // Inject a task.resume event so the loop continues with an idle prompt
643            let resume_event = Event::new(
644                "task.resume",
645                "Persistent mode: loop staying alive after completion signal. \
646                 Check for new tasks or await human guidance.",
647            );
648            self.bus.publish(resume_event);
649
650            return None;
651        }
652
653        // Runtime tasks are the canonical queue when memories/tasks mode is enabled.
654        if self.config.memories.enabled {
655            if let Ok(false) = self.verify_tasks_complete() {
656                let open_tasks = self.get_open_task_list();
657                warn!(
658                    open_tasks = ?open_tasks,
659                    "Rejecting completion event with {} open task(s)",
660                    open_tasks.len()
661                );
662                self.bus.publish(Event::new(
663                    "task.resume",
664                    format!(
665                        "Completion rejected: runtime tasks remain open: {:?}. Close, fail, or reopen outstanding tasks before emitting the completion promise.",
666                        open_tasks
667                    ),
668                ));
669                return None;
670            }
671        } else if let Ok(false) = self.verify_scratchpad_complete() {
672            warn!("Completion event with pending scratchpad tasks - trusting agent decision");
673        }
674
675        info!("Completion event detected - terminating");
676
677        // Log loop terminated
678        self.diagnostics.log_orchestration(
679            self.state.iteration,
680            "loop",
681            crate::diagnostics::OrchestrationEvent::LoopTerminated {
682                reason: "completion_event".to_string(),
683            },
684        );
685
686        Some(TerminationReason::CompletionPromise)
687    }
688
689    /// Initializes the loop by publishing the start event.
690    pub fn initialize(&mut self, prompt_content: &str) {
691        // Use configured starting_event or default to task.start for backward compatibility
692        let topic = self
693            .config
694            .event_loop
695            .starting_event
696            .clone()
697            .unwrap_or_else(|| "task.start".to_string());
698        self.initialize_with_topic(&topic, prompt_content);
699    }
700
701    /// Initializes the loop for resume mode by publishing task.resume.
702    ///
703    /// Per spec: "User can run `ralph resume` to restart reading existing scratchpad."
704    /// The planner should read the existing scratchpad rather than doing fresh gap analysis.
705    pub fn initialize_resume(&mut self, prompt_content: &str) {
706        // Resume always uses task.resume regardless of starting_event config
707        self.initialize_with_topic("task.resume", prompt_content);
708    }
709
710    /// Common initialization logic with configurable topic.
711    fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
712        // Store the objective so it persists across all iterations.
713        // After iteration 1, bus.take_pending() consumes the start event,
714        // so without this the objective would be invisible to later hats.
715        self.ralph.set_objective(prompt_content.to_string());
716
717        let start_event = Event::new(topic, prompt_content);
718        self.bus.publish(start_event);
719        debug!(topic = topic, "Published {} event", topic);
720    }
721
722    /// Gets the next hat to execute (if any have pending events).
723    ///
724    /// Per "Hatless Ralph" architecture: When custom hats are defined, Ralph is
725    /// always the executor. Custom hats define topology (pub/sub contracts) that
726    /// Ralph uses for coordination context, but Ralph handles all iterations.
727    ///
728    /// - Solo mode (no custom hats): Returns "ralph" if Ralph has pending events
729    /// - Multi-hat mode (custom hats defined): Always returns "ralph" if ANY hat has pending events
730    pub fn next_hat(&self) -> Option<&HatId> {
731        let next = self.bus.next_hat_with_pending();
732
733        // If no pending hat events but human interactions are pending, route to Ralph.
734        if next.is_none() && self.bus.has_human_pending() {
735            return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
736        }
737
738        // If no pending events, return None
739        next.as_ref()?;
740
741        // In multi-hat mode, always route to Ralph (custom hats define topology only)
742        // Ralph's prompt includes the ## HATS section for coordination awareness
743        if self.registry.is_empty() {
744            // Solo mode - return the next hat (which is "ralph")
745            next
746        } else {
747            // Return "ralph" - the constant coordinator
748            // Find ralph in the bus's registered hats
749            self.bus.hat_ids().find(|id| id.as_str() == "ralph")
750        }
751    }
752
753    /// Advances the event reader to the current end of the events file.
754    ///
755    /// Call this after writing observability records (e.g. start event) to the
756    /// events JSONL file so they are not re-read by `process_events_from_jsonl`.
757    /// The start event is already published to the bus via `initialize()`, so
758    /// re-reading it from the file would cause double-delivery.
759    pub fn sync_event_reader_to_file_end(&mut self) {
760        let path = self.event_reader.path();
761        if let Ok(metadata) = std::fs::metadata(path) {
762            self.event_reader.set_position(metadata.len());
763        }
764    }
765
766    /// Checks if any hats have pending events.
767    ///
768    /// Use this after `process_output` to detect if the LLM failed to publish an event.
769    /// If false after processing, the loop will terminate on the next iteration.
770    pub fn has_pending_events(&self) -> bool {
771        self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
772    }
773
774    /// Checks if any pending events are human-related (human.response, human.guidance).
775    ///
776    /// Used to skip cooldown delays when a human event is next, since we don't
777    /// want to artificially delay the response to a human interaction.
778    pub fn has_pending_human_events(&self) -> bool {
779        self.bus.has_human_pending()
780    }
781
782    /// Injects `human.guidance` events directly into the in-memory bus.
783    ///
784    /// This is used for local TUI/RPC guidance so the next prompt boundary
785    /// sees the message immediately without waiting for a JSONL reread.
786    pub fn inject_human_guidance<I, S>(&mut self, messages: I)
787    where
788        I: IntoIterator<Item = S>,
789        S: Into<String>,
790    {
791        for message in messages {
792            let event = Event::new("human.guidance", message.into());
793            self.state.record_event(&event);
794            self.bus.publish(event);
795        }
796    }
797
798    /// Returns whether unread JSONL events include any semantic `plan.*` topics.
799    ///
800    /// This allows callers to dispatch `pre.plan.created` hooks before
801    /// event publication handling without consuming unread events.
802    pub fn has_pending_plan_events_in_jsonl(&self) -> std::io::Result<bool> {
803        let result = self.event_reader.peek_new_events()?;
804        Ok(result
805            .events
806            .iter()
807            .any(|event| event.topic.starts_with("plan.")))
808    }
809
810    /// Returns structured context for the first unread `human.interact` event,
811    /// if one is present in JSONL without consuming reader state.
812    pub fn pending_human_interact_context_in_jsonl(&self) -> std::io::Result<Option<Value>> {
813        let result = self.event_reader.peek_new_events()?;
814        Ok(result
815            .events
816            .iter()
817            .find(|event| event.topic == "human.interact")
818            .map(|event| {
819                Self::parse_human_interact_context(event.payload.as_deref().unwrap_or_default())
820            }))
821    }
822
823    /// Gets the topics a hat is allowed to publish.
824    ///
825    /// Used to build retry prompts when the LLM forgets to publish an event.
826    pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
827        self.registry
828            .get(hat_id)
829            .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
830            .unwrap_or_default()
831    }
832
833    /// Injects a fallback event to recover from a stalled loop.
834    ///
835    /// When no hats have pending events (agent failed to publish), this method
836    /// injects a `task.resume` event which Ralph will handle to attempt recovery.
837    ///
838    /// Returns true if a fallback event was injected, false if recovery is not possible.
839    pub fn inject_fallback_event(&mut self) -> bool {
840        // If a custom hat was last executing, target the fallback back to it
841        // This preserves hat context instead of always falling back to Ralph
842        let fallback_event = match &self.state.last_hat {
843            Some(hat_id) if hat_id.as_str() != "ralph" => {
844                let publishes = self.get_hat_publishes(hat_id);
845                let payload = if publishes.is_empty() {
846                    format!(
847                        "RECOVERY: Previous iteration by hat `{}` did not publish an event. \
848                         Emit exactly one valid next event via `ralph emit`, or stop only after \
849                         publishing the configured completion event.",
850                        hat_id.as_str()
851                    )
852                } else {
853                    format!(
854                        "RECOVERY: Previous iteration by hat `{}` did not publish an event. \
855                         This failed because no event was emitted. Emit exactly ONE valid next \
856                         event via `ralph emit`. Allowed topics: `{}`. Do not only write prose \
857                         or update files. Stop immediately after emitting.",
858                        hat_id.as_str(),
859                        publishes.join("`, `")
860                    )
861                };
862
863                debug!(
864                    hat = %hat_id.as_str(),
865                    "Injecting fallback event to recover - targeting last hat with task.resume"
866                );
867                Event::new("task.resume", payload).with_target(hat_id.clone())
868            }
869            _ => {
870                debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
871                Event::new(
872                    "task.resume",
873                    "RECOVERY: Previous iteration did not publish an event. \
874                     Review the scratchpad and either dispatch the next task or complete the loop.",
875                )
876            }
877        };
878
879        self.bus.publish(fallback_event);
880        true
881    }
882
883    /// Builds the prompt for a hat's execution.
884    ///
885    /// Per "Hatless Ralph" architecture:
886    /// - Solo mode: Ralph handles everything with his own prompt
887    /// - Multi-hat mode: Ralph is the sole executor, custom hats define topology only
888    ///
889    /// When in multi-hat mode, this method collects ALL pending events across all hats
890    /// and builds Ralph's prompt with that context. The `## HATS` section in Ralph's
891    /// prompt documents the topology for coordination awareness.
892    ///
893    /// If memories are configured with `inject: auto`, this method also prepends
894    /// primed memories to the prompt context. If a scratchpad file exists and is
895    /// non-empty, its content is also prepended (before memories).
896    pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
897        // Handle "ralph" hat - the constant coordinator
898        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
899        if hat_id.as_str() == "ralph" {
900            if self.registry.is_empty() {
901                // Solo mode - just Ralph's events, no hats to filter
902                let mut events = self.bus.take_pending(&hat_id.clone());
903                let mut human_events = self.bus.take_human_pending();
904                events.append(&mut human_events);
905
906                // Separate human.guidance events from regular events
907                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
908                    .into_iter()
909                    .partition(|e| e.topic.as_str() == "human.guidance");
910
911                let events_context = regular_events
912                    .iter()
913                    .map(|e| Self::format_event(e))
914                    .collect::<Vec<_>>()
915                    .join("\n");
916
917                // Persist and inject human guidance into prompt if present
918                self.update_robot_guidance(guidance_events);
919                self.apply_robot_guidance();
920
921                // Build base prompt and prepend memories + scratchpad + ready tasks
922                let base_prompt = self.ralph.build_prompt(&events_context, &[]);
923                self.ralph.clear_robot_guidance();
924                let with_skills = self.prepend_auto_inject_skills(base_prompt);
925                let with_scratchpad = self.prepend_scratchpad(with_skills);
926                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
927
928                debug!("build_prompt: routing to HatlessRalph (solo mode)");
929                return Some(final_prompt);
930            } else {
931                // Multi-hat mode: collect events and determine active hats
932                let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
933                // Deterministic ordering (avoid HashMap iteration order nondeterminism).
934                all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
935
936                let mut all_events = Vec::new();
937                let mut system_events = Vec::new();
938
939                for id in &all_hat_ids {
940                    let pending = self.bus.take_pending(id);
941                    if pending.is_empty() {
942                        continue;
943                    }
944
945                    let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
946                    if drop_pending {
947                        // Drop the pending events that would have activated the hat.
948                        if let Some(exhausted_event) = exhausted_event {
949                            all_events.push(exhausted_event.clone());
950                            system_events.push(exhausted_event);
951                        }
952                        continue;
953                    }
954
955                    all_events.extend(pending);
956                }
957
958                let mut human_events = self.bus.take_human_pending();
959                all_events.append(&mut human_events);
960
961                // Publish orchestrator-generated system events after consuming pending events,
962                // so they become visible in the event log and can be handled next iteration.
963                for event in system_events {
964                    self.bus.publish(event);
965                }
966
967                // Separate human.guidance events from regular events
968                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
969                    .into_iter()
970                    .partition(|e| e.topic.as_str() == "human.guidance");
971
972                // Persist and inject human guidance before building prompt (must happen before
973                // immutable borrows from determine_active_hats)
974                self.update_robot_guidance(guidance_events);
975                self.apply_robot_guidance();
976
977                // Ignore kickoff/recovery noise when a real downstream event is pending.
978                let effective_regular_events = self.effective_regular_events(&regular_events);
979
980                // Determine which hats are active based on the effective event set
981                let active_hat_ids = self.determine_active_hat_ids(&regular_events);
982                self.record_hat_activations(&active_hat_ids);
983                self.state.last_active_hat_ids = active_hat_ids.clone();
984                let active_hats = self.determine_active_hats(&regular_events);
985
986                // Format events for context
987                let events_context = effective_regular_events
988                    .iter()
989                    .map(|e| Self::format_event(e))
990                    .collect::<Vec<_>>()
991                    .join("\n");
992
993                // Build base prompt and prepend memories + scratchpad if available
994                let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
995
996                // Build prompt with active hats - filters instructions to only active hats
997                debug!(
998                    "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
999                    active_hats
1000                        .iter()
1001                        .map(|h| h.id.as_str())
1002                        .collect::<Vec<_>>()
1003                );
1004
1005                // Clear guidance after active_hats references are no longer needed
1006                self.ralph.clear_robot_guidance();
1007                let with_skills = self.prepend_auto_inject_skills(base_prompt);
1008                let with_scratchpad = self.prepend_scratchpad(with_skills);
1009                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
1010
1011                return Some(final_prompt);
1012            }
1013        }
1014
1015        // Non-ralph hat requested - this shouldn't happen in multi-hat mode since
1016        // next_hat() always returns "ralph" when custom hats are defined.
1017        // But we keep this code path for backward compatibility and tests.
1018        let events = self.bus.take_pending(&hat_id.clone());
1019        let events_context = events
1020            .iter()
1021            .map(|e| Self::format_event(e))
1022            .collect::<Vec<_>>()
1023            .join("\n");
1024
1025        let hat = self.registry.get(hat_id)?;
1026
1027        // Debug logging to trace hat routing
1028        debug!(
1029            "build_prompt: hat_id='{}', instructions.is_empty()={}",
1030            hat_id.as_str(),
1031            hat.instructions.is_empty()
1032        );
1033
1034        // All hats use build_custom_hat with ghuntley-style prompts
1035        debug!(
1036            "build_prompt: routing to build_custom_hat() for '{}'",
1037            hat_id.as_str()
1038        );
1039        Some(
1040            self.instruction_builder
1041                .build_custom_hat(hat, &events_context),
1042        )
1043    }
1044
1045    /// Stores guidance payloads, persists them to scratchpad, and prepares them for prompt injection.
1046    ///
1047    /// Guidance events are ephemeral in the event bus (consumed by `take_pending`).
1048    /// This method both caches them in memory for prompt injection and appends
1049    /// them to the scratchpad file so they survive across process restarts.
1050    fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
1051        if guidance_events.is_empty() {
1052            return;
1053        }
1054
1055        // Persist new guidance to scratchpad before caching
1056        self.persist_guidance_to_scratchpad(&guidance_events);
1057
1058        self.robot_guidance
1059            .extend(guidance_events.into_iter().map(|e| e.payload));
1060    }
1061
1062    /// Appends human guidance entries to the scratchpad file for durability.
1063    ///
1064    /// Each guidance message is written as a timestamped markdown entry so it
1065    /// appears alongside the agent's own thinking and survives process restarts.
1066    fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
1067        use std::io::Write;
1068
1069        let scratchpad_path = self.scratchpad_path();
1070        let resolved_path = if scratchpad_path.is_relative() {
1071            self.config.core.workspace_root.join(&scratchpad_path)
1072        } else {
1073            scratchpad_path
1074        };
1075
1076        // Create parent directories if needed
1077        if let Some(parent) = resolved_path.parent()
1078            && !parent.exists()
1079            && let Err(e) = std::fs::create_dir_all(parent)
1080        {
1081            warn!("Failed to create scratchpad directory: {}", e);
1082            return;
1083        }
1084
1085        let mut file = match std::fs::OpenOptions::new()
1086            .create(true)
1087            .append(true)
1088            .open(&resolved_path)
1089        {
1090            Ok(f) => f,
1091            Err(e) => {
1092                warn!("Failed to open scratchpad for guidance persistence: {}", e);
1093                return;
1094            }
1095        };
1096
1097        let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
1098        for event in guidance_events {
1099            let entry = format!(
1100                "\n### HUMAN GUIDANCE ({})\n\n{}\n",
1101                timestamp, event.payload
1102            );
1103            if let Err(e) = file.write_all(entry.as_bytes()) {
1104                warn!("Failed to write guidance to scratchpad: {}", e);
1105            }
1106        }
1107
1108        info!(
1109            count = guidance_events.len(),
1110            "Persisted human guidance to scratchpad"
1111        );
1112    }
1113
1114    /// Injects cached guidance into the next prompt build.
1115    fn apply_robot_guidance(&mut self) {
1116        if self.robot_guidance.is_empty() {
1117            return;
1118        }
1119
1120        self.ralph.set_robot_guidance(self.robot_guidance.clone());
1121    }
1122
1123    /// Prepends auto-injected skill content to the prompt.
1124    ///
1125    /// This generalizes the former `prepend_memories()` into a skill auto-injection
1126    /// pipeline that handles memories, tools, and any other auto-inject skills.
1127    ///
1128    /// Injection order:
1129    /// 1. Memory data + ralph-tools skill (special case: loads memory data from store, applies budget)
1130    /// 2. RObot interaction skill (gated by `robot.enabled`)
1131    /// 3. Other auto-inject skills from the registry (wrapped in XML tags)
1132    fn prepend_auto_inject_skills(&self, prompt: String) -> String {
1133        let mut prefix = String::new();
1134
1135        // 1. Memory data + ralph-tools skill — special case with data loading
1136        self.inject_memories_and_tools_skill(&mut prefix);
1137
1138        // 2. RObot interaction skill — gated by robot.enabled
1139        self.inject_robot_skill(&mut prefix);
1140
1141        // 3. Other auto-inject skills from the registry
1142        self.inject_custom_auto_skills(&mut prefix);
1143
1144        if prefix.is_empty() {
1145            return prompt;
1146        }
1147
1148        prefix.push_str("\n\n");
1149        prefix.push_str(&prompt);
1150        prefix
1151    }
1152
1153    /// Injects memory data and the ralph-tools skill into the prefix.
1154    ///
1155    /// Special case: loads memory entries from the store, applies budget
1156    /// truncation, then appends the ralph-tools skill content (which covers
1157    /// both tasks and memories CLI usage).
1158    /// Memory data is gated by `memories.enabled && memories.inject == Auto`.
1159    /// The ralph-tools skill is injected when either memories or tasks are enabled.
1160    fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
1161        let memories_config = &self.config.memories;
1162
1163        // Inject memory DATA if memories are enabled with auto-inject
1164        if memories_config.enabled && memories_config.inject == InjectMode::Auto {
1165            info!(
1166                "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
1167                memories_config.enabled, memories_config.inject, self.config.core.workspace_root
1168            );
1169
1170            let workspace_root = &self.config.core.workspace_root;
1171            let store = MarkdownMemoryStore::with_default_path(workspace_root);
1172            let memories_path = workspace_root.join(".ralph/agent/memories.md");
1173
1174            info!(
1175                "Looking for memories at: {:?} (exists: {})",
1176                memories_path,
1177                memories_path.exists()
1178            );
1179
1180            let memories = match store.load() {
1181                Ok(memories) => {
1182                    info!("Successfully loaded {} memories from store", memories.len());
1183                    memories
1184                }
1185                Err(e) => {
1186                    info!(
1187                        "Failed to load memories for injection: {} (path: {:?})",
1188                        e, memories_path
1189                    );
1190                    Vec::new()
1191                }
1192            };
1193
1194            if memories.is_empty() {
1195                info!("Memory store is empty - no memories to inject");
1196            } else {
1197                let mut memories_content = format_memories_as_markdown(&memories);
1198
1199                if memories_config.budget > 0 {
1200                    let original_len = memories_content.len();
1201                    memories_content =
1202                        truncate_to_budget(&memories_content, memories_config.budget);
1203                    debug!(
1204                        "Applied budget: {} chars -> {} chars (budget: {})",
1205                        original_len,
1206                        memories_content.len(),
1207                        memories_config.budget
1208                    );
1209                }
1210
1211                info!(
1212                    "Injecting {} memories ({} chars) into prompt",
1213                    memories.len(),
1214                    memories_content.len()
1215                );
1216
1217                prefix.push_str(&memories_content);
1218            }
1219        }
1220
1221        // Inject ralph-tools skills conditionally based on config
1222        let tasks_enabled = self.config.tasks.enabled;
1223
1224        // Base skill (shared commands) when either memories or tasks are enabled
1225        if (memories_config.enabled || tasks_enabled)
1226            && let Some(skill) = self.skill_registry.get("ralph-tools")
1227        {
1228            if !prefix.is_empty() {
1229                prefix.push_str("\n\n");
1230            }
1231            prefix.push_str(&format!(
1232                "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1233                skill.content.trim()
1234            ));
1235            debug!("Injected ralph-tools skill from registry");
1236        }
1237
1238        // Tasks skill — only when tasks are enabled
1239        if tasks_enabled && let Some(skill) = self.skill_registry.get("ralph-tools-tasks") {
1240            if !prefix.is_empty() {
1241                prefix.push_str("\n\n");
1242            }
1243            prefix.push_str(&format!(
1244                "<ralph-tools-tasks-skill>\n{}\n</ralph-tools-tasks-skill>",
1245                skill.content.trim()
1246            ));
1247            debug!("Injected ralph-tools-tasks skill from registry");
1248        }
1249
1250        // Memories skill — only when memories are enabled
1251        if memories_config.enabled
1252            && let Some(skill) = self.skill_registry.get("ralph-tools-memories")
1253        {
1254            if !prefix.is_empty() {
1255                prefix.push_str("\n\n");
1256            }
1257            prefix.push_str(&format!(
1258                "<ralph-tools-memories-skill>\n{}\n</ralph-tools-memories-skill>",
1259                skill.content.trim()
1260            ));
1261            debug!("Injected ralph-tools-memories skill from registry");
1262        }
1263    }
1264
1265    /// Injects the RObot interaction skill content into the prefix.
1266    ///
1267    /// Gated by `robot.enabled`. Teaches agents how and when to interact
1268    /// with humans via `human.interact` events.
1269    fn inject_robot_skill(&self, prefix: &mut String) {
1270        if !self.config.robot.enabled {
1271            return;
1272        }
1273
1274        if let Some(skill) = self.skill_registry.get("robot-interaction") {
1275            if !prefix.is_empty() {
1276                prefix.push_str("\n\n");
1277            }
1278            prefix.push_str(&format!(
1279                "<robot-skill>\n{}\n</robot-skill>",
1280                skill.content.trim()
1281            ));
1282            debug!("Injected robot interaction skill from registry");
1283        }
1284    }
1285
1286    /// Injects any user-configured auto-inject skills (excluding built-in skills handled separately).
1287    fn inject_custom_auto_skills(&self, prefix: &mut String) {
1288        for skill in self.skill_registry.auto_inject_skills(None) {
1289            // Skip built-in skills handled above
1290            if matches!(
1291                skill.name.as_str(),
1292                "ralph-tools" | "ralph-tools-tasks" | "ralph-tools-memories" | "robot-interaction"
1293            ) {
1294                continue;
1295            }
1296
1297            if !prefix.is_empty() {
1298                prefix.push_str("\n\n");
1299            }
1300            prefix.push_str(&format!(
1301                "<{name}-skill>\n{content}\n</{name}-skill>",
1302                name = skill.name,
1303                content = skill.content.trim()
1304            ));
1305            debug!("Injected auto-inject skill: {}", skill.name);
1306        }
1307    }
1308
1309    /// Prepends scratchpad content to the prompt if the file exists and is non-empty.
1310    ///
1311    /// The scratchpad is the agent's working memory for the current objective.
1312    /// Auto-injecting saves one tool call per iteration.
1313    /// When the file exceeds the budget, the TAIL is kept (most recent entries).
1314    fn prepend_scratchpad(&self, prompt: String) -> String {
1315        let scratchpad_path = self.scratchpad_path();
1316
1317        let resolved_path = if scratchpad_path.is_relative() {
1318            self.config.core.workspace_root.join(&scratchpad_path)
1319        } else {
1320            scratchpad_path
1321        };
1322
1323        if !resolved_path.exists() {
1324            debug!(
1325                "Scratchpad not found at {:?}, skipping injection",
1326                resolved_path
1327            );
1328            return prompt;
1329        }
1330
1331        let content = match std::fs::read_to_string(&resolved_path) {
1332            Ok(c) => c,
1333            Err(e) => {
1334                info!("Failed to read scratchpad for injection: {}", e);
1335                return prompt;
1336            }
1337        };
1338
1339        if content.trim().is_empty() {
1340            debug!("Scratchpad is empty, skipping injection");
1341            return prompt;
1342        }
1343
1344        // Budget: 4000 tokens ~16000 chars. Keep the TAIL (most recent content).
1345        let char_budget = 4000 * 4;
1346        let content = if content.len() > char_budget {
1347            // Find a line boundary near the start of the tail
1348            let start = content.len() - char_budget;
1349            // Ensure we start at a valid UTF-8 character boundary
1350            let start = floor_char_boundary(&content, start);
1351            let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1352            let discarded = &content[..line_start];
1353
1354            // Summarize discarded content by extracting markdown headings
1355            let headings: Vec<&str> = discarded
1356                .lines()
1357                .filter(|line| line.starts_with('#'))
1358                .collect();
1359            let summary = if headings.is_empty() {
1360                format!(
1361                    "<!-- earlier content truncated ({} chars omitted) -->",
1362                    line_start
1363                )
1364            } else {
1365                format!(
1366                    "<!-- earlier content truncated ({} chars omitted) -->\n\
1367                     <!-- discarded sections: {} -->",
1368                    line_start,
1369                    headings.join(" | ")
1370                )
1371            };
1372
1373            format!("{}\n\n{}", summary, &content[line_start..])
1374        } else {
1375            content
1376        };
1377
1378        info!("Injecting scratchpad ({} chars) into prompt", content.len());
1379
1380        let mut final_prompt = format!(
1381            "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1382            self.config.core.scratchpad, content
1383        );
1384        final_prompt.push_str(&prompt);
1385        final_prompt
1386    }
1387
1388    /// Prepends ready tasks to the prompt if tasks are enabled and any exist.
1389    ///
1390    /// Loads the task store and formats ready (unblocked, open) tasks into
1391    /// a `<ready-tasks>` XML block. This saves the agent a tool call per
1392    /// iteration and puts tasks at the same prominence as the scratchpad.
1393    fn prepend_ready_tasks(&self, prompt: String) -> String {
1394        if !self.config.tasks.enabled {
1395            return prompt;
1396        }
1397
1398        use crate::task::TaskStatus;
1399        use crate::task_store::TaskStore;
1400
1401        let tasks_path = self.tasks_path();
1402        let resolved_path = if tasks_path.is_relative() {
1403            self.config.core.workspace_root.join(&tasks_path)
1404        } else {
1405            tasks_path
1406        };
1407
1408        if !resolved_path.exists() {
1409            return prompt;
1410        }
1411
1412        let store = match TaskStore::load(&resolved_path) {
1413            Ok(s) => s,
1414            Err(e) => {
1415                info!("Failed to load task store for injection: {}", e);
1416                return prompt;
1417            }
1418        };
1419
1420        let current_loop_id = self.current_loop_id();
1421
1422        let ready = Self::filter_tasks_by_loop(store.ready(), current_loop_id.as_deref());
1423        let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1424        let all_count =
1425            Self::filter_tasks_by_loop(store.all().iter().collect(), current_loop_id.as_deref())
1426                .len();
1427        let closed_count = all_count - open.len();
1428
1429        if open.is_empty() && closed_count == 0 {
1430            return prompt;
1431        }
1432
1433        let mut section = String::from("<ready-tasks>\n");
1434        if ready.is_empty() && open.is_empty() {
1435            section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1436        } else {
1437            section.push_str(&format!(
1438                "## Tasks: {} ready, {} open, {} closed\n\n",
1439                ready.len(),
1440                open.len(),
1441                closed_count
1442            ));
1443            for task in &ready {
1444                let status_icon = match task.status {
1445                    TaskStatus::Open => "[ ]",
1446                    TaskStatus::InProgress => "[~]",
1447                    _ => "[?]",
1448                };
1449                section.push_str(&format!(
1450                    "- {} [P{}] {} ({}){}\n",
1451                    status_icon,
1452                    task.priority,
1453                    task.title,
1454                    task.id,
1455                    task.key
1456                        .as_deref()
1457                        .map(|key| format!(" — key: {key}"))
1458                        .unwrap_or_default()
1459                ));
1460            }
1461            // Show blocked tasks separately so agent knows they exist
1462            let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1463            let blocked: Vec<_> = open
1464                .iter()
1465                .filter(|t| !ready_ids.contains(&t.id.as_str()))
1466                .collect();
1467            if !blocked.is_empty() {
1468                section.push_str("\nBlocked:\n");
1469                for task in blocked {
1470                    section.push_str(&format!(
1471                        "- [blocked] [P{}] {} ({}){} — blocked by: {}\n",
1472                        task.priority,
1473                        task.title,
1474                        task.id,
1475                        task.key
1476                            .as_deref()
1477                            .map(|key| format!(" — key: {key}"))
1478                            .unwrap_or_default(),
1479                        task.blocked_by.join(", ")
1480                    ));
1481                }
1482            }
1483        }
1484        section.push_str("</ready-tasks>\n\n");
1485
1486        info!(
1487            "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1488            ready.len(),
1489            open.len(),
1490            closed_count
1491        );
1492
1493        let mut final_prompt = section;
1494        final_prompt.push_str(&prompt);
1495        final_prompt
1496    }
1497
1498    /// Builds the Ralph prompt (coordination mode).
1499    pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1500        self.ralph.build_prompt(prompt_content, &[])
1501    }
1502
1503    /// Determines which hats should be active based on pending events.
1504    /// Returns list of Hat references that are triggered by any pending event.
1505    fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1506        let mut active_hats = Vec::new();
1507        for id in self.determine_active_hat_ids(events) {
1508            if let Some(hat) = self.registry.get(&id) {
1509                active_hats.push(hat);
1510            }
1511        }
1512        active_hats
1513    }
1514
1515    fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1516        let mut entrypoint_hat_ids = Vec::new();
1517        let mut progressed_hat_ids = Vec::new();
1518        for event in events {
1519            // Prefer direct event target over topic-based lookup
1520            let hat_id = if let Some(target) = &event.target
1521                && self.registry.get(target).is_some()
1522            {
1523                target.clone()
1524            } else if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1525                hat.id.clone()
1526            } else {
1527                continue;
1528            };
1529
1530            let list = if self.is_entrypoint_topic(event.topic.as_str()) {
1531                &mut entrypoint_hat_ids
1532            } else {
1533                &mut progressed_hat_ids
1534            };
1535            if !list.iter().any(|id| id == &hat_id) {
1536                list.push(hat_id);
1537            }
1538        }
1539        // Prefer progressed hats over entrypoint hats. Entrypoint events
1540        // (starting_event, task.start, task.resume) linger in the bus after
1541        // the first hat runs. Including them would re-activate the first hat
1542        // alongside downstream hats, confusing the agent with multiple hat
1543        // instructions when only the downstream hat should run.
1544        if progressed_hat_ids.is_empty() {
1545            entrypoint_hat_ids
1546        } else {
1547            progressed_hat_ids
1548        }
1549    }
1550
1551    fn effective_regular_events<'a>(&self, events: &'a [Event]) -> Vec<&'a Event> {
1552        let has_downstream_event = events
1553            .iter()
1554            .any(|event| !Self::is_kickoff_or_recovery_event(event.topic.as_str()));
1555        events
1556            .iter()
1557            .filter(|event| {
1558                !has_downstream_event || !Self::is_kickoff_or_recovery_event(event.topic.as_str())
1559            })
1560            .collect()
1561    }
1562
1563    fn is_kickoff_or_recovery_event(topic: &str) -> bool {
1564        topic == "task.start" || topic == "task.resume" || topic.strip_suffix(".start").is_some()
1565    }
1566
1567    fn is_entrypoint_topic(&self, topic: &str) -> bool {
1568        topic == "task.start"
1569            || topic == "task.resume"
1570            || topic.strip_suffix(".start").is_some()
1571            || self.config.event_loop.starting_event.as_deref() == Some(topic)
1572    }
1573
1574    fn peek_pending_regular_events(&self) -> Vec<Event> {
1575        let mut events = Vec::new();
1576        for hat_id in self.bus.hat_ids() {
1577            if let Some(pending) = self.bus.peek_pending(hat_id) {
1578                events.extend(pending.iter().cloned());
1579            }
1580        }
1581        events
1582    }
1583
1584    /// Formats an event for prompt context.
1585    ///
1586    /// For top-level prompts (task.start, task.resume), wraps the payload in
1587    /// `<top-level-prompt>` XML tags to clearly delineate the user's original request.
1588    fn format_event(event: &Event) -> String {
1589        let topic = &event.topic;
1590        let payload = &event.payload;
1591
1592        if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1593            format!(
1594                "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1595                topic, payload
1596            )
1597        } else {
1598            format!("Event: {} - {}", topic, payload)
1599        }
1600    }
1601
1602    fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1603        let Some(config) = self.registry.get_config(hat_id) else {
1604            return (false, None);
1605        };
1606        let Some(max) = config.max_activations else {
1607            return (false, None);
1608        };
1609
1610        let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1611        if count < max {
1612            return (false, None);
1613        }
1614
1615        // Emit only once per hat per run (avoid flooding).
1616        let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1617
1618        if !should_emit {
1619            // Hat is already exhausted - drop pending events silently.
1620            return (true, None);
1621        }
1622
1623        let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1624        dropped_topics.sort();
1625
1626        let payload = format!(
1627            "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n  - {topics}",
1628            hat = hat_id.as_str(),
1629            max = max,
1630            count = count,
1631            topics = dropped_topics.join("\n  - ")
1632        );
1633
1634        warn!(
1635            hat = %hat_id.as_str(),
1636            max_activations = max,
1637            activations = count,
1638            "Hat exhausted (max_activations reached)"
1639        );
1640
1641        (
1642            true,
1643            Some(Event::new(
1644                format!("{}.exhausted", hat_id.as_str()),
1645                payload,
1646            )),
1647        )
1648    }
1649
1650    fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1651        for hat_id in active_hat_ids {
1652            *self
1653                .state
1654                .hat_activation_counts
1655                .entry(hat_id.clone())
1656                .or_insert(0) += 1;
1657        }
1658    }
1659
1660    /// Returns the primary active hat ID for display purposes.
1661    /// Returns the first active hat, or "ralph" if no specific hat is active.
1662    /// BTreeMap iteration is already sorted by key.
1663    pub fn get_active_hat_id(&self) -> HatId {
1664        let pending_events = self.peek_pending_regular_events();
1665        if let Some(active_hat_id) = self
1666            .determine_active_hat_ids(&pending_events)
1667            .into_iter()
1668            .next()
1669        {
1670            return active_hat_id;
1671        }
1672        HatId::new("ralph")
1673    }
1674
1675    /// Injects a default event for a hat when the agent wrote no events.
1676    ///
1677    /// Call this after `process_events_from_jsonl` returns `Ok(false)` (no events found).
1678    /// If the hat has `default_publishes` configured, this injects the default event.
1679    ///
1680    /// If the default topic matches the completion promise, `completion_requested` is set
1681    /// so the loop can terminate. Without this, completion events injected via
1682    /// `default_publishes` would only be published to the bus (triggering downstream hats)
1683    /// but never detected by `check_completion_event`, causing an infinite loop.
1684    pub fn check_default_publishes(&mut self, hat_id: &HatId) {
1685        if let Some(config) = self.registry.get_config(hat_id)
1686            && let Some(default_topic) = &config.default_publishes
1687        {
1688            let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1689
1690            debug!(
1691                hat = %hat_id.as_str(),
1692                topic = %default_topic,
1693                "No events written by hat, injecting default_publishes event"
1694            );
1695
1696            self.state.record_event(&default_event);
1697
1698            // If the default topic is the completion promise, set the flag directly.
1699            // The normal path (process_events_from_jsonl) sets this when reading from
1700            // JSONL, but default_publishes bypasses JSONL entirely.
1701            if default_topic.as_str() == self.config.event_loop.completion_promise {
1702                info!(
1703                    hat = %hat_id.as_str(),
1704                    topic = %default_topic,
1705                    "default_publishes matches completion_promise — requesting termination"
1706                );
1707                self.state.completion_requested = true;
1708            }
1709
1710            self.bus.publish(default_event);
1711        }
1712    }
1713
1714    /// Returns a mutable reference to the event bus for direct event publishing.
1715    ///
1716    /// This is primarily used for planning sessions to inject user responses
1717    /// as events into the orchestration loop.
1718    pub fn bus(&mut self) -> &mut EventBus {
1719        &mut self.bus
1720    }
1721
1722    /// Processes output from a hat execution.
1723    ///
1724    /// Returns the termination reason if the loop should stop.
1725    pub fn process_output(
1726        &mut self,
1727        hat_id: &HatId,
1728        output: &str,
1729        success: bool,
1730    ) -> Option<TerminationReason> {
1731        self.state.iteration += 1;
1732        self.state.last_hat = Some(hat_id.clone());
1733
1734        // Periodic robot check-in
1735        if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1736            && let Some(ref robot_service) = self.robot_service
1737        {
1738            let elapsed = self.state.elapsed();
1739            let interval = std::time::Duration::from_secs(interval_secs);
1740            let last = self
1741                .state
1742                .last_checkin_at
1743                .map(|t| t.elapsed())
1744                .unwrap_or(elapsed);
1745
1746            if last >= interval {
1747                let context = self.build_checkin_context(hat_id);
1748                match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1749                    Ok(_) => {
1750                        self.state.last_checkin_at = Some(std::time::Instant::now());
1751                        debug!(iteration = self.state.iteration, "Sent robot check-in");
1752                    }
1753                    Err(e) => {
1754                        warn!(error = %e, "Failed to send robot check-in");
1755                    }
1756                }
1757            }
1758        }
1759
1760        // Log iteration started
1761        self.diagnostics.log_orchestration(
1762            self.state.iteration,
1763            "loop",
1764            crate::diagnostics::OrchestrationEvent::IterationStarted,
1765        );
1766
1767        // Log hat selected
1768        self.diagnostics.log_orchestration(
1769            self.state.iteration,
1770            "loop",
1771            crate::diagnostics::OrchestrationEvent::HatSelected {
1772                hat: hat_id.to_string(),
1773                reason: "process_output".to_string(),
1774            },
1775        );
1776
1777        // Track failures
1778        if success {
1779            self.state.consecutive_failures = 0;
1780        } else {
1781            self.state.consecutive_failures += 1;
1782        }
1783
1784        let _ = output;
1785
1786        // File-modification audit: detect when a hat with disallowed Edit/Write tools
1787        // modified files. This is hard enforcement — emits a scope_violation event.
1788        self.audit_file_modifications(hat_id);
1789
1790        // Events are ONLY read from the JSONL file written by `ralph emit`.
1791        // This enforces tool use and prevents confabulation (agent claiming to emit without actually doing so).
1792        // See process_events_from_jsonl() for event processing.
1793
1794        // Check termination conditions
1795        self.check_termination()
1796    }
1797
1798    /// Audits file modifications after a hat iteration.
1799    ///
1800    /// If the hat has `Edit` or `Write` in its `disallowed_tools`, checks whether
1801    /// files were modified (via `git diff --stat HEAD`). If so, emits a
1802    /// `<hat_id>.scope_violation` event.
1803    fn audit_file_modifications(&mut self, hat_id: &HatId) {
1804        let config = match self.registry.get_config(hat_id) {
1805            Some(c) => c,
1806            None => return,
1807        };
1808
1809        let has_write_restriction = config
1810            .disallowed_tools
1811            .iter()
1812            .any(|t| t == "Edit" || t == "Write");
1813
1814        if !has_write_restriction {
1815            return;
1816        }
1817
1818        let workspace = &self.config.core.workspace_root;
1819        let diff_output = std::process::Command::new("git")
1820            .args(["diff", "--stat", "HEAD"])
1821            .current_dir(workspace)
1822            .output();
1823
1824        match diff_output {
1825            Ok(output) if !output.stdout.is_empty() => {
1826                let diff_stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
1827                warn!(
1828                    hat = %hat_id.as_str(),
1829                    diff = %diff_stat,
1830                    "Hat modified files despite tool restrictions (scope violation)"
1831                );
1832
1833                let violation_topic = format!("{}.scope_violation", hat_id.as_str());
1834                let violation = Event::new(
1835                    violation_topic.as_str(),
1836                    format!(
1837                        "Hat '{}' modified files with Edit/Write disallowed:\n{}",
1838                        hat_id.as_str(),
1839                        diff_stat
1840                    ),
1841                );
1842                self.bus.publish(violation);
1843            }
1844            Err(e) => {
1845                debug!(error = %e, "Could not run git diff for file-modification audit");
1846            }
1847            _ => {} // No modifications — all good
1848        }
1849    }
1850
1851    /// Extracts task identifier from build.blocked payload.
1852    /// Uses first line of payload as task ID.
1853    fn extract_task_id(payload: &str) -> String {
1854        payload
1855            .lines()
1856            .next()
1857            .unwrap_or("unknown")
1858            .trim()
1859            .to_string()
1860    }
1861
1862    /// Adds cost to the cumulative total.
1863    pub fn add_cost(&mut self, cost: f64) {
1864        self.state.cumulative_cost += cost;
1865    }
1866
1867    /// Verifies all tasks in scratchpad are complete or cancelled.
1868    ///
1869    /// Returns:
1870    /// - `Ok(true)` if all tasks are `[x]` or `[~]`
1871    /// - `Ok(false)` if any tasks are `[ ]` (pending)
1872    /// - `Err(...)` if scratchpad doesn't exist or can't be read
1873    fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1874        let scratchpad_path = self.scratchpad_path();
1875
1876        if !scratchpad_path.exists() {
1877            return Err(std::io::Error::new(
1878                std::io::ErrorKind::NotFound,
1879                "Scratchpad does not exist",
1880            ));
1881        }
1882
1883        let content = std::fs::read_to_string(scratchpad_path)?;
1884
1885        let has_pending = content
1886            .lines()
1887            .any(|line| line.trim_start().starts_with("- [ ]"));
1888
1889        Ok(!has_pending)
1890    }
1891
1892    /// Reads the current loop ID from the marker file.
1893    ///
1894    /// Returns `None` if no marker exists or is empty, which means
1895    /// task queries should be unfiltered (backwards compatible).
1896    fn current_loop_id(&self) -> Option<String> {
1897        self.loop_context
1898            .as_ref()
1899            .and_then(|ctx| {
1900                let marker_path = ctx.ralph_dir().join("current-loop-id");
1901                std::fs::read_to_string(&marker_path).ok()
1902            })
1903            .map(|id| id.trim().to_string())
1904            .filter(|id| !id.is_empty())
1905    }
1906
1907    /// Filters a task list by loop ID. When `loop_id` is `None`, returns all tasks.
1908    fn filter_tasks_by_loop<'a>(
1909        tasks: Vec<&'a crate::task::Task>,
1910        loop_id: Option<&str>,
1911    ) -> Vec<&'a crate::task::Task> {
1912        match loop_id {
1913            Some(id) => tasks
1914                .into_iter()
1915                .filter(|t| t.loop_id.as_deref() == Some(id))
1916                .collect(),
1917            None => tasks,
1918        }
1919    }
1920
1921    fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1922        use crate::task_store::TaskStore;
1923
1924        let tasks_path = self.tasks_path();
1925
1926        // No tasks file = no pending tasks = complete
1927        if !tasks_path.exists() {
1928            return Ok(true);
1929        }
1930
1931        let store = TaskStore::load(&tasks_path)?;
1932        let current_loop_id = self.current_loop_id();
1933        let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1934        Ok(open.is_empty())
1935    }
1936
1937    /// Builds a [`CheckinContext`] with current loop state for robot check-ins.
1938    fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
1939        let (open_tasks, closed_tasks) = self.count_tasks();
1940        CheckinContext {
1941            current_hat: Some(hat_id.as_str().to_string()),
1942            open_tasks,
1943            closed_tasks,
1944            cumulative_cost: self.state.cumulative_cost,
1945        }
1946    }
1947
1948    /// Counts open and closed tasks from the task store.
1949    ///
1950    /// Returns `(open_count, closed_count)`. "Open" means non-terminal tasks,
1951    /// "closed" means tasks with `TaskStatus::Closed`.
1952    fn count_tasks(&self) -> (usize, usize) {
1953        use crate::task_store::TaskStore;
1954
1955        let tasks_path = self.tasks_path();
1956        if !tasks_path.exists() {
1957            return (0, 0);
1958        }
1959
1960        match TaskStore::load(&tasks_path) {
1961            Ok(store) => {
1962                let current_loop_id = self.current_loop_id();
1963                let all = Self::filter_tasks_by_loop(
1964                    store.all().iter().collect(),
1965                    current_loop_id.as_deref(),
1966                );
1967                let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1968                let closed = all.len() - open.len();
1969                (open.len(), closed)
1970            }
1971            Err(_) => (0, 0),
1972        }
1973    }
1974
1975    /// Returns a list of open task descriptions for logging purposes.
1976    fn get_open_task_list(&self) -> Vec<String> {
1977        use crate::task_store::TaskStore;
1978
1979        let tasks_path = self.tasks_path();
1980        if let Ok(store) = TaskStore::load(&tasks_path) {
1981            let current_loop_id = self.current_loop_id();
1982            let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1983            return open
1984                .iter()
1985                .map(|t| format!("{}: {}", t.id, t.title))
1986                .collect();
1987        }
1988        vec![]
1989    }
1990
1991    fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
1992        let threshold = self.config.event_loop.mutation_score_warn_threshold;
1993
1994        match &evidence.mutants {
1995            Some(mutants) => {
1996                if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
1997                    warn!(
1998                        reason = %reason,
1999                        mutants_status = ?mutants.status,
2000                        mutants_score = mutants.score_percent,
2001                        mutants_threshold = threshold,
2002                        "Mutation testing warning"
2003                    );
2004                }
2005            }
2006            None => {
2007                if let Some(threshold) = threshold {
2008                    warn!(
2009                        mutants_threshold = threshold,
2010                        "Mutation testing warning: missing mutation evidence in build.done payload"
2011                    );
2012                }
2013            }
2014        }
2015    }
2016
2017    fn mutation_warning_reason(
2018        mutants: &MutationEvidence,
2019        threshold: Option<f64>,
2020    ) -> Option<String> {
2021        match mutants.status {
2022            MutationStatus::Fail => Some("mutation testing failed".to_string()),
2023            MutationStatus::Warn => Some(Self::format_mutation_message(
2024                "mutation score below threshold",
2025                mutants.score_percent,
2026            )),
2027            MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
2028            MutationStatus::Pass => {
2029                let threshold = threshold?;
2030
2031                match mutants.score_percent {
2032                    Some(score) if score < threshold => Some(format!(
2033                        "mutation score {:.2}% below threshold {:.2}%",
2034                        score, threshold
2035                    )),
2036                    Some(_) => None,
2037                    None => Some(format!(
2038                        "mutation score missing (threshold {:.2}%)",
2039                        threshold
2040                    )),
2041                }
2042            }
2043        }
2044    }
2045
2046    fn format_mutation_message(message: &str, score: Option<f64>) -> String {
2047        match score {
2048            Some(score) => format!("{message} ({score:.2}%)"),
2049            None => message.to_string(),
2050        }
2051    }
2052
2053    fn parse_human_interact_context(payload: &str) -> Value {
2054        let mut context = match serde_json::from_str::<Value>(payload) {
2055            Ok(Value::Object(map)) => map,
2056            Ok(value) => {
2057                let mut map = Map::new();
2058                map.insert("question".to_string(), value);
2059                map
2060            }
2061            Err(_) => {
2062                let mut map = Map::new();
2063                map.insert("question".to_string(), Value::String(payload.to_string()));
2064                map
2065            }
2066        };
2067
2068        if !context.contains_key("question") {
2069            context.insert("question".to_string(), Value::String(payload.to_string()));
2070        }
2071
2072        Value::Object(context)
2073    }
2074
2075    fn is_restart_request_payload(payload: &str) -> bool {
2076        let payload = payload.to_ascii_lowercase();
2077        payload.contains("restart yourself") || payload.contains("restart ralph")
2078    }
2079
2080    fn is_restart_request_event(event: &Event) -> bool {
2081        matches!(event.topic.as_str(), "human.response" | "user.prompt")
2082            && Self::is_restart_request_payload(&event.payload)
2083    }
2084
2085    fn mark_restart_requested(&self, source: &str) {
2086        let restart_path =
2087            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
2088
2089        if let Some(parent) = restart_path.parent()
2090            && let Err(err) = std::fs::create_dir_all(parent)
2091        {
2092            warn!(
2093                error = %err,
2094                path = %parent.display(),
2095                "Failed to create restart-requested parent directory"
2096            );
2097            return;
2098        }
2099
2100        if let Err(err) = std::fs::write(&restart_path, source) {
2101            warn!(
2102                error = %err,
2103                path = %restart_path.display(),
2104                "Failed to write restart-requested signal"
2105            );
2106            return;
2107        }
2108
2109        info!(
2110            source,
2111            path = %restart_path.display(),
2112            "Restart requested from human text"
2113        );
2114    }
2115
2116    /// Processes events from JSONL and routes orphaned events to Ralph.
2117    ///
2118    /// Also handles backpressure for malformed JSONL lines by:
2119    /// 1. Emitting `event.malformed` system events for each parse failure
2120    /// 2. Tracking consecutive failures for termination check
2121    /// 3. Resetting counter when valid events are parsed
2122    ///
2123    /// Returns [`ProcessedEvents`] indicating whether events were found, whether
2124    /// semantic `plan.*` topics were published, structured `human.interact`
2125    /// context/outcome metadata, and whether any were orphans that Ralph should
2126    /// handle.
2127    pub fn process_events_from_jsonl(&mut self) -> std::io::Result<ProcessedEvents> {
2128        let result = self.event_reader.read_new_events()?;
2129        self.process_parse_result(result)
2130    }
2131
2132    /// Inner event processing that operates on an already-parsed `ParseResult`.
2133    ///
2134    /// This is the single source of truth for event validation, backpressure,
2135    /// scope enforcement, and bus publishing. Both `process_events_from_jsonl`
2136    /// and `process_events_from_jsonl_with_waves` delegate to this method.
2137    fn process_parse_result(
2138        &mut self,
2139        result: crate::event_reader::ParseResult,
2140    ) -> std::io::Result<ProcessedEvents> {
2141        // Handle malformed lines with backpressure
2142        for malformed in &result.malformed {
2143            let payload = format!(
2144                "Line {}: {}\nContent: {}",
2145                malformed.line_number, malformed.error, &malformed.content
2146            );
2147            let event = Event::new("event.malformed", &payload);
2148            self.bus.publish(event);
2149            self.state.consecutive_malformed_events += 1;
2150            warn!(
2151                line = malformed.line_number,
2152                consecutive = self.state.consecutive_malformed_events,
2153                "Malformed event line detected"
2154            );
2155        }
2156
2157        // Reset counter when valid events are parsed
2158        if !result.events.is_empty() {
2159            self.state.consecutive_malformed_events = 0;
2160        }
2161
2162        if result.events.is_empty() && result.malformed.is_empty() {
2163            return Ok(ProcessedEvents {
2164                had_events: false,
2165                had_plan_events: false,
2166                human_interact_context: None,
2167                has_orphans: false,
2168            });
2169        }
2170
2171        // --- Scope enforcement: filter events against active hat's publishes ---
2172        // Only active when enforce_hat_scope is true in config (opt-in).
2173        let events = if self.config.event_loop.enforce_hat_scope {
2174            let active_hats = self.state.last_active_hat_ids.clone();
2175            let (in_scope, out_of_scope): (Vec<_>, Vec<_>) =
2176                result.events.into_iter().partition(|event| {
2177                    if active_hats.is_empty() {
2178                        return true; // Ralph coordinating — no scope restriction
2179                    }
2180                    active_hats
2181                        .iter()
2182                        .any(|hat_id| self.registry.can_publish(hat_id, event.topic.as_str()))
2183                });
2184
2185            for event in &out_of_scope {
2186                let violation_hat = active_hats.first().map(|h| h.as_str()).unwrap_or("unknown");
2187                warn!(
2188                    active_hats = ?active_hats,
2189                    topic = %event.topic,
2190                    "Scope violation: active hat(s) cannot publish this topic — dropping event"
2191                );
2192                let violation_topic = format!("{}.scope_violation", violation_hat);
2193                let violation_payload = format!(
2194                    "Attempted to publish '{}': {}",
2195                    event.topic,
2196                    event.payload.clone().unwrap_or_default()
2197                );
2198                let violation = Event::new(violation_topic, violation_payload);
2199                self.bus.publish(violation);
2200            }
2201
2202            in_scope
2203        } else {
2204            result.events
2205        };
2206        // --- End scope enforcement ---
2207
2208        let mut has_orphans = false;
2209
2210        // Validate and transform events (apply backpressure for build.done)
2211        let mut validated_events = Vec::new();
2212        let completion_topic = self.config.event_loop.completion_promise.as_str();
2213        let cancellation_topic = self.config.event_loop.cancellation_promise.clone();
2214        let total_events = events.len();
2215        for (index, event) in events.into_iter().enumerate() {
2216            let payload = event.payload.clone().unwrap_or_default();
2217
2218            // Detect loop.cancel — unconditional graceful termination
2219            if !cancellation_topic.is_empty() && event.topic.as_str() == cancellation_topic {
2220                info!(
2221                    payload = %payload,
2222                    "loop.cancel event detected — scheduling graceful termination"
2223                );
2224                self.state.cancellation_requested = true;
2225                // Continue processing remaining events (they may contain cleanup info)
2226                continue;
2227            }
2228
2229            if event.topic == completion_topic {
2230                if index + 1 == total_events {
2231                    self.state.completion_requested = true;
2232                    self.diagnostics.log_orchestration(
2233                        self.state.iteration,
2234                        "jsonl",
2235                        crate::diagnostics::OrchestrationEvent::EventPublished {
2236                            topic: event.topic.clone(),
2237                        },
2238                    );
2239                    info!(
2240                        topic = %event.topic,
2241                        "Completion event detected in JSONL"
2242                    );
2243                } else {
2244                    warn!(
2245                        topic = %event.topic,
2246                        index = index,
2247                        total_events = total_events,
2248                        "Completion event ignored because it was not the last event"
2249                    );
2250                }
2251                continue;
2252            }
2253
2254            if event.topic == "build.done" {
2255                // Validate build.done events have backpressure evidence
2256                if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
2257                    if evidence.all_passed() {
2258                        self.warn_on_mutation_evidence(&evidence);
2259                        validated_events.push(Event::new(event.topic.as_str(), &payload));
2260                    } else {
2261                        // Evidence present but checks failed - synthesize build.blocked
2262                        warn!(
2263                            tests = evidence.tests_passed,
2264                            lint = evidence.lint_passed,
2265                            typecheck = evidence.typecheck_passed,
2266                            audit = evidence.audit_passed,
2267                            coverage = evidence.coverage_passed,
2268                            complexity = evidence.complexity_score,
2269                            duplication = evidence.duplication_passed,
2270                            performance = evidence.performance_regression,
2271                            specs = evidence.specs_verified,
2272                            "build.done rejected: backpressure checks failed"
2273                        );
2274
2275                        let complexity = evidence
2276                            .complexity_score
2277                            .map(|value| format!("{value:.2}"))
2278                            .unwrap_or_else(|| "missing".to_string());
2279                        let performance = match evidence.performance_regression {
2280                            Some(true) => "regression".to_string(),
2281                            Some(false) => "pass".to_string(),
2282                            None => "missing".to_string(),
2283                        };
2284                        let specs = match evidence.specs_verified {
2285                            Some(true) => "pass".to_string(),
2286                            Some(false) => "fail".to_string(),
2287                            None => "not reported".to_string(),
2288                        };
2289
2290                        self.diagnostics.log_orchestration(
2291                            self.state.iteration,
2292                            "jsonl",
2293                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2294                                reason: format!(
2295                                    "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
2296                                    evidence.tests_passed,
2297                                    evidence.lint_passed,
2298                                    evidence.typecheck_passed,
2299                                    evidence.audit_passed,
2300                                    evidence.coverage_passed,
2301                                    complexity,
2302                                    evidence.duplication_passed,
2303                                    performance,
2304                                    specs
2305                                ),
2306                            },
2307                        );
2308
2309                        validated_events.push(Event::new(
2310                            "build.blocked",
2311                            "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
2312                        ));
2313                    }
2314                } else {
2315                    // No evidence found - synthesize build.blocked
2316                    warn!("build.done rejected: missing backpressure evidence");
2317
2318                    self.diagnostics.log_orchestration(
2319                        self.state.iteration,
2320                        "jsonl",
2321                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2322                            reason: "missing backpressure evidence".to_string(),
2323                        },
2324                    );
2325
2326                    validated_events.push(Event::new(
2327                        "build.blocked",
2328                        "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
2329                    ));
2330                }
2331            } else if event.topic == "review.done" && !event.is_wave_event() {
2332                // Validate review.done events have verification evidence.
2333                // Wave worker events skip this — wave reviews are read-only
2334                // and don't run tests/builds.
2335                if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
2336                    if evidence.is_verified() {
2337                        validated_events.push(Event::new(event.topic.as_str(), &payload));
2338                    } else {
2339                        // Evidence present but checks failed - synthesize review.blocked
2340                        warn!(
2341                            tests = evidence.tests_passed,
2342                            build = evidence.build_passed,
2343                            "review.done rejected: verification checks failed"
2344                        );
2345
2346                        self.diagnostics.log_orchestration(
2347                            self.state.iteration,
2348                            "jsonl",
2349                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2350                                reason: format!(
2351                                    "review verification failed: tests={}, build={}",
2352                                    evidence.tests_passed, evidence.build_passed
2353                                ),
2354                            },
2355                        );
2356
2357                        validated_events.push(Event::new(
2358                            "review.blocked",
2359                            "Review verification failed. Run tests and build before emitting review.done.",
2360                        ));
2361                    }
2362                } else {
2363                    // No evidence found - synthesize review.blocked
2364                    warn!("review.done rejected: missing verification evidence");
2365
2366                    self.diagnostics.log_orchestration(
2367                        self.state.iteration,
2368                        "jsonl",
2369                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2370                            reason: "missing review verification evidence".to_string(),
2371                        },
2372                    );
2373
2374                    validated_events.push(Event::new(
2375                        "review.blocked",
2376                        "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
2377                    ));
2378                }
2379            } else if event.topic == "verify.passed" {
2380                if let Some(report) = EventParser::parse_quality_report(&payload) {
2381                    if report.meets_thresholds() {
2382                        validated_events.push(Event::new(event.topic.as_str(), &payload));
2383                    } else {
2384                        let failed = report.failed_dimensions();
2385                        let reason = if failed.is_empty() {
2386                            "quality thresholds failed".to_string()
2387                        } else {
2388                            format!("quality thresholds failed: {}", failed.join(", "))
2389                        };
2390
2391                        warn!(
2392                            failed_dimensions = ?failed,
2393                            "verify.passed rejected: quality thresholds failed"
2394                        );
2395
2396                        self.diagnostics.log_orchestration(
2397                            self.state.iteration,
2398                            "jsonl",
2399                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2400                                reason,
2401                            },
2402                        );
2403
2404                        validated_events.push(Event::new(
2405                            "verify.failed",
2406                            "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
2407                        ));
2408                    }
2409                } else {
2410                    // No quality report found - synthesize verify.failed
2411                    warn!("verify.passed rejected: missing quality report");
2412
2413                    self.diagnostics.log_orchestration(
2414                        self.state.iteration,
2415                        "jsonl",
2416                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2417                            reason: "missing quality report".to_string(),
2418                        },
2419                    );
2420
2421                    validated_events.push(Event::new(
2422                        "verify.failed",
2423                        "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
2424                    ));
2425                }
2426            } else if event.topic == "verify.failed" {
2427                if EventParser::parse_quality_report(&payload).is_none() {
2428                    warn!("verify.failed missing quality report");
2429                }
2430                validated_events.push(Event::new(event.topic.as_str(), &payload));
2431            } else {
2432                // Non-backpressure events pass through unchanged
2433                validated_events.push(Event::new(event.topic.as_str(), &payload));
2434            }
2435        }
2436
2437        // Track build.blocked events for thrashing detection
2438        let blocked_events: Vec<_> = validated_events
2439            .iter()
2440            .filter(|e| e.topic == "build.blocked".into())
2441            .collect();
2442
2443        for blocked_event in &blocked_events {
2444            let task_id = Self::extract_task_id(&blocked_event.payload);
2445
2446            let count = self
2447                .state
2448                .task_block_counts
2449                .entry(task_id.clone())
2450                .or_insert(0);
2451            *count += 1;
2452
2453            debug!(
2454                task_id = %task_id,
2455                block_count = *count,
2456                "Task blocked"
2457            );
2458
2459            // After 3 blocks on same task, emit build.task.abandoned
2460            if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
2461                warn!(
2462                    task_id = %task_id,
2463                    "Task abandoned after 3 consecutive blocks"
2464                );
2465
2466                self.state.abandoned_tasks.push(task_id.clone());
2467
2468                self.diagnostics.log_orchestration(
2469                    self.state.iteration,
2470                    "jsonl",
2471                    crate::diagnostics::OrchestrationEvent::TaskAbandoned {
2472                        reason: format!(
2473                            "3 consecutive build.blocked events for task '{}'",
2474                            task_id
2475                        ),
2476                    },
2477                );
2478
2479                let abandoned_event = Event::new(
2480                    "build.task.abandoned",
2481                    format!(
2482                        "Task '{}' abandoned after 3 consecutive build.blocked events",
2483                        task_id
2484                    ),
2485                );
2486
2487                self.bus.publish(abandoned_event);
2488            }
2489        }
2490
2491        // Track hat-level blocking for legacy thrashing detection
2492        let has_blocked_event = !blocked_events.is_empty();
2493
2494        if has_blocked_event {
2495            self.state.consecutive_blocked += 1;
2496        } else {
2497            self.state.consecutive_blocked = 0;
2498            self.state.last_blocked_hat = None;
2499        }
2500
2501        // Handle human.interact blocking behavior:
2502        // When a human.interact event is detected and robot service is active,
2503        // send the question and block until human.response or timeout.
2504        let mut response_event = None;
2505        let mut human_interact_context = None;
2506        let ask_human_idx = validated_events
2507            .iter()
2508            .position(|e| e.topic == "human.interact".into());
2509
2510        if let Some(idx) = ask_human_idx {
2511            let ask_event = &validated_events[idx];
2512            let payload = ask_event.payload.clone();
2513
2514            let mut context = match Self::parse_human_interact_context(&payload) {
2515                Value::Object(map) => map,
2516                _ => Map::new(),
2517            };
2518
2519            if let Some(ref robot_service) = self.robot_service {
2520                info!(
2521                    payload = %payload,
2522                    "human.interact event detected — sending question via robot service"
2523                );
2524
2525                // Send the question (includes retry with exponential backoff)
2526                let send_ok = match robot_service.send_question(&payload) {
2527                    Ok(_message_id) => true,
2528                    Err(e) => {
2529                        warn!(
2530                            error = %e,
2531                            "Failed to send human.interact question after retries — treating as timeout"
2532                        );
2533                        // Log to diagnostics
2534                        self.diagnostics.log_error(
2535                            self.state.iteration,
2536                            "telegram",
2537                            crate::diagnostics::DiagnosticError::TelegramSendError {
2538                                operation: "send_question".to_string(),
2539                                error: e.to_string(),
2540                                retry_count: 3,
2541                            },
2542                        );
2543                        context.insert(
2544                            "outcome".to_string(),
2545                            Value::String("send_failure".to_string()),
2546                        );
2547                        context.insert("error".to_string(), Value::String(e.to_string()));
2548                        false
2549                    }
2550                };
2551
2552                // Block: poll events file for human.response
2553                // Per spec, even on send failure we treat as timeout (continue without blocking)
2554                if send_ok {
2555                    // Read the active events path from the current-events marker,
2556                    // falling back to the default events.jsonl if not available.
2557                    let events_path = self
2558                        .loop_context
2559                        .as_ref()
2560                        .and_then(|ctx| {
2561                            std::fs::read_to_string(ctx.current_events_marker())
2562                                .ok()
2563                                .map(|s| ctx.workspace().join(s.trim()))
2564                        })
2565                        .or_else(|| {
2566                            std::fs::read_to_string(".ralph/current-events")
2567                                .ok()
2568                                .map(|s| PathBuf::from(s.trim()))
2569                        })
2570                        .unwrap_or_else(|| {
2571                            self.loop_context
2572                                .as_ref()
2573                                .map(|ctx| ctx.events_path())
2574                                .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2575                        });
2576
2577                    match robot_service.wait_for_response(&events_path) {
2578                        Ok(Some(response)) => {
2579                            info!(
2580                                response = %response,
2581                                "Received human.response — continuing loop"
2582                            );
2583                            context.insert(
2584                                "outcome".to_string(),
2585                                Value::String("response".to_string()),
2586                            );
2587                            context.insert("response".to_string(), Value::String(response.clone()));
2588                            // Create a human.response event to inject into the bus
2589                            response_event = Some(Event::new("human.response", &response));
2590                        }
2591                        Ok(None) => {
2592                            warn!(
2593                                timeout_secs = robot_service.timeout_secs(),
2594                                "Human response timeout — injecting human.timeout event"
2595                            );
2596                            context.insert(
2597                                "outcome".to_string(),
2598                                Value::String("timeout".to_string()),
2599                            );
2600                            context.insert(
2601                                "timeout_seconds".to_string(),
2602                                Value::from(robot_service.timeout_secs()),
2603                            );
2604                            let timeout_event = Event::new(
2605                                "human.timeout",
2606                                format!(
2607                                    "No response after {}s. Original question: {}",
2608                                    robot_service.timeout_secs(),
2609                                    payload
2610                                ),
2611                            );
2612                            response_event = Some(timeout_event);
2613                        }
2614                        Err(e) => {
2615                            warn!(
2616                                error = %e,
2617                                "Error waiting for human response — injecting human.timeout event"
2618                            );
2619                            context.insert(
2620                                "outcome".to_string(),
2621                                Value::String("wait_error".to_string()),
2622                            );
2623                            context.insert("error".to_string(), Value::String(e.to_string()));
2624                            let timeout_event = Event::new(
2625                                "human.timeout",
2626                                format!(
2627                                    "Error waiting for response: {}. Original question: {}",
2628                                    e, payload
2629                                ),
2630                            );
2631                            response_event = Some(timeout_event);
2632                        }
2633                    }
2634                }
2635            } else {
2636                debug!(
2637                    "human.interact event detected but no robot service active — passing through"
2638                );
2639                context.insert(
2640                    "outcome".to_string(),
2641                    Value::String("no_robot_service".to_string()),
2642                );
2643            }
2644
2645            human_interact_context = Some(Value::Object(context));
2646        }
2647
2648        let restart_requested = validated_events.iter().any(Self::is_restart_request_event)
2649            || response_event
2650                .as_ref()
2651                .is_some_and(Self::is_restart_request_event);
2652        if restart_requested {
2653            self.mark_restart_requested("human_text");
2654        }
2655
2656        // Track whether any events will be published (before the loop consumes them).
2657        let had_events = !validated_events.is_empty();
2658        let had_plan_events = validated_events
2659            .iter()
2660            .any(|event| event.topic.as_str().starts_with("plan."));
2661
2662        // Publish validated events to the bus.
2663        // Ralph is always registered with subscribe("*"), so every event has at least
2664        // one subscriber. Events without a specific hat subscriber are "orphaned" —
2665        // Ralph handles them as the universal fallback.
2666        for event in validated_events {
2667            // Record topic for event chain validation
2668            self.state.record_event(&event);
2669
2670            self.diagnostics.log_orchestration(
2671                self.state.iteration,
2672                "jsonl",
2673                crate::diagnostics::OrchestrationEvent::EventPublished {
2674                    topic: event.topic.to_string(),
2675                },
2676            );
2677
2678            if !self.registry.has_subscriber(event.topic.as_str()) {
2679                has_orphans = true;
2680            }
2681
2682            debug!(
2683                topic = %event.topic,
2684                "Publishing event from JSONL"
2685            );
2686            self.bus.publish(event);
2687        }
2688
2689        // Publish human.response event if one was received during blocking
2690        if let Some(response) = response_event {
2691            self.state.record_event(&response);
2692            info!(
2693                topic = %response.topic,
2694                "Publishing human.response event from robot service"
2695            );
2696            self.bus.publish(response);
2697        }
2698
2699        Ok(ProcessedEvents {
2700            had_events,
2701            had_plan_events,
2702            human_interact_context,
2703            has_orphans,
2704        })
2705    }
2706
2707    /// Process events from JSONL, partitioning wave events from regular events.
2708    ///
2709    /// Wave events (those with `wave_id` set and targeting a concurrent hat) are
2710    /// extracted and returned separately. Regular events go through the full
2711    /// backpressure pipeline via `process_parse_result`.
2712    pub fn process_events_from_jsonl_with_waves(
2713        &mut self,
2714    ) -> std::io::Result<ProcessedEventsWithWaves> {
2715        let result = self.event_reader.read_new_events()?;
2716
2717        // Partition: wave dispatch events vs regular events.
2718        // Only events that target a concurrent hat (concurrency > 1) are wave dispatches.
2719        // Wave *results* (e.g. review.done) have wave_id set but should be treated as
2720        // regular events so they reach the bus and trigger downstream hats (e.g. aggregator).
2721        //
2722        // Uses find_by_trigger + get_config — the same resolution path as
2723        // detect_wave_events — to ensure partition and detection agree.
2724        let (wave_events, regular_events): (Vec<_>, Vec<_>) =
2725            result.events.into_iter().partition(|e| {
2726                e.wave_id.is_some()
2727                    && self
2728                        .registry
2729                        .find_by_trigger(e.topic.as_str())
2730                        .and_then(|hat_id| self.registry.get_config(hat_id))
2731                        .is_some_and(|hat_config| hat_config.concurrency > 1)
2732            });
2733
2734        if !wave_events.is_empty() {
2735            debug!(
2736                wave_count = wave_events.len(),
2737                regular_count = regular_events.len(),
2738                "Partitioned wave events from regular events"
2739            );
2740        }
2741
2742        // Delegate regular events to the full pipeline (backpressure, scope
2743        // enforcement, human.interact, plan detection, etc.)
2744        let regular_result = crate::event_reader::ParseResult {
2745            events: regular_events,
2746            malformed: result.malformed,
2747        };
2748        let processed = self.process_parse_result(regular_result)?;
2749
2750        Ok(ProcessedEventsWithWaves {
2751            processed,
2752            wave_events,
2753        })
2754    }
2755
2756    /// Checks if output contains a completion event from Ralph.
2757    ///
2758    /// Completion must be emitted as an `<event>` tag, not plain text.
2759    pub fn check_ralph_completion(&self, output: &str) -> bool {
2760        let events = EventParser::new().parse(output);
2761        events
2762            .iter()
2763            .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2764    }
2765
2766    /// Publishes the loop.terminate system event to observers.
2767    ///
2768    /// Per spec: "Published by the orchestrator (not agents) when the loop exits."
2769    /// This is an observer-only event—hats cannot trigger on it.
2770    ///
2771    /// Returns the event for logging purposes.
2772    pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2773        // Stop the robot service if it was running
2774        self.stop_robot_service();
2775
2776        let elapsed = self.state.elapsed();
2777        let duration_str = format_duration(elapsed);
2778
2779        let payload = format!(
2780            "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2781            reason.as_str(),
2782            termination_status_text(reason),
2783            self.state.iteration,
2784            duration_str,
2785            reason.exit_code()
2786        );
2787
2788        let event = Event::new("loop.terminate", &payload);
2789
2790        // Publish to bus for observers (but no hat can trigger on this)
2791        self.bus.publish(event.clone());
2792
2793        info!(
2794            reason = %reason.as_str(),
2795            iterations = self.state.iteration,
2796            duration = %duration_str,
2797            "Wrapping up: {}. {} iterations in {}.",
2798            reason.as_str(),
2799            self.state.iteration,
2800            duration_str
2801        );
2802
2803        event
2804    }
2805
2806    /// Returns the robot service's shutdown flag, if active.
2807    ///
2808    /// Signal handlers can set this flag to interrupt `wait_for_response()`
2809    /// without waiting for the full timeout.
2810    pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2811        self.robot_service.as_ref().map(|s| s.shutdown_flag())
2812    }
2813
2814    /// Stops the robot service if it's running.
2815    ///
2816    /// Called during loop termination to cleanly shut down the communication backend.
2817    fn stop_robot_service(&mut self) {
2818        if let Some(service) = self.robot_service.take() {
2819            service.stop();
2820        }
2821    }
2822
2823    // -------------------------------------------------------------------------
2824    // Human-in-the-loop planning support
2825    // -------------------------------------------------------------------------
2826
2827    /// Check if any event is a `user.prompt` event.
2828    ///
2829    /// Returns the first user prompt event found, or None.
2830    pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2831        events
2832            .iter()
2833            .find(|e| e.topic.as_str() == "user.prompt")
2834            .map(|e| UserPrompt {
2835                id: Self::extract_prompt_id(&e.payload),
2836                text: e.payload.clone(),
2837            })
2838    }
2839
2840    /// Extract a prompt ID from the event payload.
2841    ///
2842    /// Supports both XML attribute format: `<event topic="user.prompt" id="q1">...</event>`
2843    /// and JSON format in payload.
2844    fn extract_prompt_id(payload: &str) -> String {
2845        // Try to extract id attribute from XML-like format first
2846        if let Some(start) = payload.find("id=\"")
2847            && let Some(end) = payload[start + 4..].find('"')
2848        {
2849            return payload[start + 4..start + 4 + end].to_string();
2850        }
2851
2852        // Fallback: generate a simple ID based on timestamp
2853        format!("q{}", Self::generate_prompt_id())
2854    }
2855
2856    /// Generate a simple unique ID for prompts.
2857    /// Uses timestamp-based generation since uuid crate isn't available.
2858    fn generate_prompt_id() -> String {
2859        use std::time::{SystemTime, UNIX_EPOCH};
2860        let nanos = SystemTime::now()
2861            .duration_since(UNIX_EPOCH)
2862            .unwrap()
2863            .as_nanos();
2864        format!("{:x}", nanos % 0xFFFF_FFFF)
2865    }
2866}
2867
2868/// A user prompt that requires human input.
2869///
2870/// Created when the agent emits a `user.prompt` event during planning.
2871#[derive(Debug, Clone)]
2872pub struct UserPrompt {
2873    /// Unique identifier for this prompt (e.g., "q1", "q2")
2874    pub id: String,
2875    /// The prompt/question text
2876    pub text: String,
2877}
2878
2879/// Formats a duration as human-readable string.
2880fn format_duration(d: Duration) -> String {
2881    let total_secs = d.as_secs();
2882    let hours = total_secs / 3600;
2883    let minutes = (total_secs % 3600) / 60;
2884    let seconds = total_secs % 60;
2885
2886    if hours > 0 {
2887        format!("{}h {}m {}s", hours, minutes, seconds)
2888    } else if minutes > 0 {
2889        format!("{}m {}s", minutes, seconds)
2890    } else {
2891        format!("{}s", seconds)
2892    }
2893}
2894
2895/// Returns a human-readable status based on termination reason.
2896fn termination_status_text(reason: &TerminationReason) -> &'static str {
2897    match reason {
2898        TerminationReason::CompletionPromise => "All tasks completed successfully.",
2899        TerminationReason::MaxIterations => "Stopped at iteration limit.",
2900        TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2901        TerminationReason::MaxCost => "Stopped at cost limit.",
2902        TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2903        TerminationReason::LoopThrashing => {
2904            "Loop thrashing detected - same hat repeatedly blocked."
2905        }
2906        TerminationReason::LoopStale => {
2907            "Stale loop detected - same topic emitted 3+ times consecutively."
2908        }
2909        TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2910        TerminationReason::Stopped => "Manually stopped.",
2911        TerminationReason::Interrupted => "Interrupted by signal.",
2912        TerminationReason::RestartRequested => "Restarting by human request.",
2913        TerminationReason::WorkspaceGone => "Workspace directory removed externally.",
2914        TerminationReason::Cancelled => "Cancelled gracefully (human rejection or timeout).",
2915    }
2916}