Skip to main content

ralph_core/event_loop/
mod.rs

1//! Event loop orchestration.
2//!
3//! The event loop coordinates the execution of hats via pub/sub messaging.
4
5mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use crate::text::floor_char_boundary;
21use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
22use serde_json::{Map, Value};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use std::time::Duration;
27use tracing::{debug, info, warn};
28
29/// Result of processing events from JSONL.
30#[derive(Debug, Clone)]
31pub struct ProcessedEvents {
32    /// Whether any valid events were found and published.
33    pub had_events: bool,
34    /// Whether any published events matched the semantic `plan.*` topic family.
35    pub had_plan_events: bool,
36    /// Structured context for the first processed `human.interact` event,
37    /// including the question payload and post-dispatch outcome metadata.
38    pub human_interact_context: Option<Value>,
39    /// Whether any events lacked specific hat subscribers (orphans handled by Ralph).
40    pub has_orphans: bool,
41}
42
43/// Reason the event loop terminated.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum TerminationReason {
46    /// Completion promise was detected in output.
47    CompletionPromise,
48    /// Maximum iterations reached.
49    MaxIterations,
50    /// Maximum runtime exceeded.
51    MaxRuntime,
52    /// Maximum cost exceeded.
53    MaxCost,
54    /// Too many consecutive failures.
55    ConsecutiveFailures,
56    /// Loop thrashing detected (repeated blocked events).
57    LoopThrashing,
58    /// Stale loop detected (same topic emitted 3+ times consecutively).
59    LoopStale,
60    /// Too many consecutive malformed JSONL lines in events file.
61    ValidationFailure,
62    /// Manually stopped.
63    Stopped,
64    /// Interrupted by signal (SIGINT/SIGTERM).
65    Interrupted,
66    /// Restart requested via Telegram `/restart` command.
67    RestartRequested,
68    /// Workspace directory (worktree) was removed externally.
69    WorkspaceGone,
70    /// Loop was cancelled gracefully via loop.cancel event (human rejection, timeout).
71    Cancelled,
72}
73
74impl TerminationReason {
75    /// Returns the exit code for this termination reason per spec.
76    ///
77    /// Per spec "Loop Termination" section:
78    /// - 0: Completion promise detected (success)
79    /// - 1: Consecutive failures or unrecoverable error (failure)
80    /// - 2: Max iterations, max runtime, or max cost exceeded (limit)
81    /// - 130: User interrupt (SIGINT = 128 + 2)
82    pub fn exit_code(&self) -> i32 {
83        match self {
84            TerminationReason::CompletionPromise => 0,
85            TerminationReason::ConsecutiveFailures
86            | TerminationReason::LoopThrashing
87            | TerminationReason::LoopStale
88            | TerminationReason::ValidationFailure
89            | TerminationReason::Stopped
90            | TerminationReason::WorkspaceGone => 1,
91            TerminationReason::MaxIterations
92            | TerminationReason::MaxRuntime
93            | TerminationReason::MaxCost => 2,
94            TerminationReason::Interrupted => 130,
95            // Restart uses exit code 3 to signal the caller to exec-replace
96            TerminationReason::RestartRequested => 3,
97            // Cancelled is a clean exit (0) — the loop stopped intentionally
98            TerminationReason::Cancelled => 0,
99        }
100    }
101
102    /// Returns the reason string for use in loop.terminate event payload.
103    ///
104    /// Per spec event payload format:
105    /// `completed | max_iterations | max_runtime | consecutive_failures | interrupted | error`
106    pub fn as_str(&self) -> &'static str {
107        match self {
108            TerminationReason::CompletionPromise => "completed",
109            TerminationReason::MaxIterations => "max_iterations",
110            TerminationReason::MaxRuntime => "max_runtime",
111            TerminationReason::MaxCost => "max_cost",
112            TerminationReason::ConsecutiveFailures => "consecutive_failures",
113            TerminationReason::LoopThrashing => "loop_thrashing",
114            TerminationReason::LoopStale => "loop_stale",
115            TerminationReason::ValidationFailure => "validation_failure",
116            TerminationReason::Stopped => "stopped",
117            TerminationReason::Interrupted => "interrupted",
118            TerminationReason::RestartRequested => "restart_requested",
119            TerminationReason::WorkspaceGone => "workspace_gone",
120            TerminationReason::Cancelled => "cancelled",
121        }
122    }
123
124    /// Returns true if this is a successful completion (not an error or limit).
125    pub fn is_success(&self) -> bool {
126        matches!(self, TerminationReason::CompletionPromise)
127    }
128}
129
130/// The main event loop orchestrator.
131pub struct EventLoop {
132    config: RalphConfig,
133    registry: HatRegistry,
134    bus: EventBus,
135    state: LoopState,
136    instruction_builder: InstructionBuilder,
137    ralph: HatlessRalph,
138    /// Cached human guidance messages that should persist across iterations.
139    robot_guidance: Vec<String>,
140    /// Event reader for consuming events from JSONL file.
141    /// Made pub(crate) to allow tests to override the path.
142    pub(crate) event_reader: EventReader,
143    diagnostics: crate::diagnostics::DiagnosticsCollector,
144    /// Loop context for path resolution (None for legacy single-loop mode).
145    loop_context: Option<LoopContext>,
146    /// Skill registry for the current loop.
147    skill_registry: SkillRegistry,
148    /// Robot service for human-in-the-loop communication.
149    /// Injected externally when `human.enabled` is true and this is the primary loop.
150    robot_service: Option<Box<dyn RobotService>>,
151}
152
153impl EventLoop {
154    /// Creates a new event loop from configuration.
155    pub fn new(config: RalphConfig) -> Self {
156        // Try to create diagnostics collector, but fall back to disabled if it fails
157        // (e.g., in tests without proper directory setup)
158        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
159            .unwrap_or_else(|e| {
160                debug!(
161                    "Failed to initialize diagnostics: {}, using disabled collector",
162                    e
163                );
164                crate::diagnostics::DiagnosticsCollector::disabled()
165            });
166
167        Self::with_diagnostics(config, diagnostics)
168    }
169
170    /// Creates a new event loop with a loop context for path resolution.
171    ///
172    /// The loop context determines where events, tasks, and other state files
173    /// are located. Use this for multi-loop scenarios where each loop runs
174    /// in an isolated workspace (git worktree).
175    pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
176        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
177            .unwrap_or_else(|e| {
178                debug!(
179                    "Failed to initialize diagnostics: {}, using disabled collector",
180                    e
181                );
182                crate::diagnostics::DiagnosticsCollector::disabled()
183            });
184
185        Self::with_context_and_diagnostics(config, context, diagnostics)
186    }
187
188    /// Creates a new event loop with explicit loop context and diagnostics.
189    pub fn with_context_and_diagnostics(
190        config: RalphConfig,
191        context: LoopContext,
192        diagnostics: crate::diagnostics::DiagnosticsCollector,
193    ) -> Self {
194        let registry = HatRegistry::from_config(&config);
195        let instruction_builder =
196            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
197
198        let mut bus = EventBus::new();
199
200        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
201        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
202        // Custom hats are registered first (higher priority), Ralph catches everything else.
203        for hat in registry.all() {
204            bus.register(hat.clone());
205        }
206
207        // Always register Ralph as catch-all coordinator
208        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
209        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
210        bus.register(ralph_hat);
211
212        if registry.is_empty() {
213            debug!("Solo mode: Ralph is the only coordinator");
214        } else {
215            debug!(
216                "Multi-hat mode: {} custom hats + Ralph as fallback",
217                registry.len()
218            );
219        }
220
221        // Build skill registry from config
222        let skill_registry = if config.skills.enabled {
223            SkillRegistry::from_config(
224                &config.skills,
225                context.workspace(),
226                Some(config.cli.backend.as_str()),
227            )
228            .unwrap_or_else(|e| {
229                warn!(
230                    "Failed to build skill registry: {}, using empty registry",
231                    e
232                );
233                SkillRegistry::new(Some(config.cli.backend.as_str()))
234            })
235        } else {
236            SkillRegistry::new(Some(config.cli.backend.as_str()))
237        };
238
239        let skill_index = if config.skills.enabled {
240            skill_registry.build_index(None)
241        } else {
242            String::new()
243        };
244
245        // When memories are enabled, add tasks CLI instructions alongside scratchpad
246        let ralph = HatlessRalph::new(
247            config.event_loop.completion_promise.clone(),
248            config.core.clone(),
249            &registry,
250            config.event_loop.starting_event.clone(),
251        )
252        .with_memories_enabled(config.memories.enabled)
253        .with_skill_index(skill_index);
254
255        // Read timestamped events path from marker file, fall back to default
256        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
257        // which we resolve relative to the workspace root
258        let events_path = std::fs::read_to_string(context.current_events_marker())
259            .map(|s| {
260                let relative = s.trim();
261                context.workspace().join(relative)
262            })
263            .unwrap_or_else(|_| context.events_path());
264        let event_reader = EventReader::new(&events_path);
265
266        Self {
267            config,
268            registry,
269            bus,
270            state: LoopState::new(),
271            instruction_builder,
272            ralph,
273            robot_guidance: Vec::new(),
274            event_reader,
275            diagnostics,
276            loop_context: Some(context),
277            skill_registry,
278            robot_service: None,
279        }
280    }
281
282    /// Creates a new event loop with explicit diagnostics collector (for testing).
283    pub fn with_diagnostics(
284        config: RalphConfig,
285        diagnostics: crate::diagnostics::DiagnosticsCollector,
286    ) -> Self {
287        let registry = HatRegistry::from_config(&config);
288        let instruction_builder =
289            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
290
291        let mut bus = EventBus::new();
292
293        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
294        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
295        // Custom hats are registered first (higher priority), Ralph catches everything else.
296        for hat in registry.all() {
297            bus.register(hat.clone());
298        }
299
300        // Always register Ralph as catch-all coordinator
301        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
302        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
303        bus.register(ralph_hat);
304
305        if registry.is_empty() {
306            debug!("Solo mode: Ralph is the only coordinator");
307        } else {
308            debug!(
309                "Multi-hat mode: {} custom hats + Ralph as fallback",
310                registry.len()
311            );
312        }
313
314        // Build skill registry from config
315        let workspace_root = std::path::Path::new(".");
316        let skill_registry = if config.skills.enabled {
317            SkillRegistry::from_config(
318                &config.skills,
319                workspace_root,
320                Some(config.cli.backend.as_str()),
321            )
322            .unwrap_or_else(|e| {
323                warn!(
324                    "Failed to build skill registry: {}, using empty registry",
325                    e
326                );
327                SkillRegistry::new(Some(config.cli.backend.as_str()))
328            })
329        } else {
330            SkillRegistry::new(Some(config.cli.backend.as_str()))
331        };
332
333        let skill_index = if config.skills.enabled {
334            skill_registry.build_index(None)
335        } else {
336            String::new()
337        };
338
339        // When memories are enabled, add tasks CLI instructions alongside scratchpad
340        let ralph = HatlessRalph::new(
341            config.event_loop.completion_promise.clone(),
342            config.core.clone(),
343            &registry,
344            config.event_loop.starting_event.clone(),
345        )
346        .with_memories_enabled(config.memories.enabled)
347        .with_skill_index(skill_index);
348
349        // Read events path from marker file, fall back to default if not present
350        // The marker file is written by run_loop_impl() at run startup
351        let events_path = std::fs::read_to_string(".ralph/current-events")
352            .map(|s| s.trim().to_string())
353            .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
354        let event_reader = EventReader::new(&events_path);
355
356        Self {
357            config,
358            registry,
359            bus,
360            state: LoopState::new(),
361            instruction_builder,
362            ralph,
363            robot_guidance: Vec::new(),
364            event_reader,
365            diagnostics,
366            loop_context: None,
367            skill_registry,
368            robot_service: None,
369        }
370    }
371
372    /// Injects a robot service for human-in-the-loop communication.
373    ///
374    /// Call this after construction to enable `human.interact` event handling,
375    /// periodic check-ins, and question/response flow. The service is typically
376    /// created by the CLI layer (e.g., `TelegramService`) and injected here,
377    /// keeping the core event loop decoupled from any specific communication
378    /// platform.
379    pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
380        self.robot_service = Some(service);
381    }
382
383    /// Returns the loop context, if one was provided.
384    pub fn loop_context(&self) -> Option<&LoopContext> {
385        self.loop_context.as_ref()
386    }
387
388    /// Returns the tasks path based on loop context or default.
389    fn tasks_path(&self) -> PathBuf {
390        self.loop_context
391            .as_ref()
392            .map(|ctx| ctx.tasks_path())
393            .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
394    }
395
396    /// Returns the scratchpad path based on loop context or config.
397    fn scratchpad_path(&self) -> PathBuf {
398        self.loop_context
399            .as_ref()
400            .map(|ctx| ctx.scratchpad_path())
401            .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
402    }
403
404    /// Returns the current loop state.
405    pub fn state(&self) -> &LoopState {
406        &self.state
407    }
408
409    /// Returns the configuration.
410    pub fn config(&self) -> &RalphConfig {
411        &self.config
412    }
413
414    /// Returns the hat registry.
415    pub fn registry(&self) -> &HatRegistry {
416        &self.registry
417    }
418
419    /// Records hook telemetry for diagnostics.
420    pub fn log_hook_run_telemetry(&self, entry: crate::diagnostics::HookRunTelemetryEntry) {
421        self.diagnostics.log_hook_run(entry);
422    }
423
424    /// Gets the backend configuration for a hat.
425    ///
426    /// If the hat has a backend configured, returns that.
427    /// Otherwise, returns None (caller should use global backend).
428    pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
429        self.registry
430            .get_config(hat_id)
431            .and_then(|config| config.backend.as_ref())
432    }
433
434    /// Adds an observer that receives all published events.
435    ///
436    /// Multiple observers can be added (e.g., session recorder + TUI).
437    /// Each observer is called before events are routed to subscribers.
438    pub fn add_observer<F>(&mut self, observer: F)
439    where
440        F: Fn(&Event) + Send + 'static,
441    {
442        self.bus.add_observer(observer);
443    }
444
445    /// Sets a single observer, clearing any existing observers.
446    ///
447    /// Prefer `add_observer` when multiple observers are needed.
448    #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
449    pub fn set_observer<F>(&mut self, observer: F)
450    where
451        F: Fn(&Event) + Send + 'static,
452    {
453        #[allow(deprecated)]
454        self.bus.set_observer(observer);
455    }
456
457    /// Checks if any termination condition is met.
458    pub fn check_termination(&self) -> Option<TerminationReason> {
459        let cfg = &self.config.event_loop;
460
461        if self.state.iteration >= cfg.max_iterations {
462            return Some(TerminationReason::MaxIterations);
463        }
464
465        if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
466            return Some(TerminationReason::MaxRuntime);
467        }
468
469        if let Some(max_cost) = cfg.max_cost_usd
470            && self.state.cumulative_cost >= max_cost
471        {
472            return Some(TerminationReason::MaxCost);
473        }
474
475        if self.state.consecutive_failures >= cfg.max_consecutive_failures {
476            return Some(TerminationReason::ConsecutiveFailures);
477        }
478
479        // Check for loop thrashing: planner keeps dispatching abandoned tasks
480        if self.state.abandoned_task_redispatches >= 3 {
481            return Some(TerminationReason::LoopThrashing);
482        }
483
484        // Check for validation failures: too many consecutive malformed JSONL lines
485        if self.state.consecutive_malformed_events >= 3 {
486            return Some(TerminationReason::ValidationFailure);
487        }
488
489        // Check for stale loop: same topic emitted 3+ times in a row
490        if self.state.consecutive_same_topic >= 3 {
491            warn!(
492                topic = self.state.last_emitted_topic.as_deref().unwrap_or("?"),
493                count = self.state.consecutive_same_topic,
494                "Stale loop detected: same topic emitted consecutively"
495            );
496            return Some(TerminationReason::LoopStale);
497        }
498
499        // Check for stop signal from Telegram /stop or CLI stop-requested
500        let stop_path =
501            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
502        if stop_path.exists() {
503            let _ = std::fs::remove_file(&stop_path);
504            return Some(TerminationReason::Stopped);
505        }
506
507        // Check for restart signal from Telegram /restart command
508        let restart_path =
509            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
510        if restart_path.exists() {
511            return Some(TerminationReason::RestartRequested);
512        }
513
514        // Check if workspace directory has been removed (zombie worktree detection)
515        if !std::path::Path::new(&self.config.core.workspace_root).is_dir() {
516            return Some(TerminationReason::WorkspaceGone);
517        }
518
519        None
520    }
521
522    /// Check if a loop.cancel event was detected.
523    ///
524    /// Unlike check_completion_event(), this does NOT validate required_events.
525    /// Cancellation is an explicit abort — it doesn't need the workflow to be complete.
526    pub fn check_cancellation_event(&mut self) -> Option<TerminationReason> {
527        if !self.state.cancellation_requested {
528            return None;
529        }
530        self.state.cancellation_requested = false;
531        info!("Loop cancelled gracefully via loop.cancel event");
532
533        self.diagnostics.log_orchestration(
534            self.state.iteration,
535            "loop",
536            crate::diagnostics::OrchestrationEvent::LoopTerminated {
537                reason: "cancelled".to_string(),
538            },
539        );
540
541        Some(TerminationReason::Cancelled)
542    }
543
544    /// Checks if a completion event was received and returns termination reason.
545    ///
546    /// Completion is only accepted via JSONL events (e.g., `ralph emit`).
547    pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
548        if !self.state.completion_requested {
549            return None;
550        }
551
552        // Event chain validation: check required events were seen
553        let required = &self.config.event_loop.required_events;
554        if !required.is_empty() {
555            let missing = self.state.missing_required_events(required);
556            if !missing.is_empty() {
557                warn!(
558                    missing = ?missing,
559                    "Rejecting LOOP_COMPLETE: required events not seen during loop lifetime"
560                );
561                self.state.completion_requested = false;
562
563                // Inject task.resume so the loop continues
564                let resume_payload = format!(
565                    "LOOP_COMPLETE rejected: missing required events: {:?}. \
566                     The agent must complete all workflow phases before emitting LOOP_COMPLETE. \
567                     Use loop.cancel to abort the workflow instead.",
568                    missing
569                );
570                self.bus.publish(Event::new("task.resume", resume_payload));
571                return None;
572            }
573        }
574
575        self.state.completion_requested = false;
576
577        // In persistent mode, suppress completion and keep the loop alive
578        if self.config.event_loop.persistent {
579            info!("Completion event suppressed - persistent mode active, loop staying alive");
580
581            self.diagnostics.log_orchestration(
582                self.state.iteration,
583                "loop",
584                crate::diagnostics::OrchestrationEvent::LoopTerminated {
585                    reason: "completion_event_suppressed_persistent".to_string(),
586                },
587            );
588
589            // Inject a task.resume event so the loop continues with an idle prompt
590            let resume_event = Event::new(
591                "task.resume",
592                "Persistent mode: loop staying alive after completion signal. \
593                 Check for new tasks or await human guidance.",
594            );
595            self.bus.publish(resume_event);
596
597            return None;
598        }
599
600        // Log warning if tasks remain open (informational only)
601        if self.config.memories.enabled {
602            if let Ok(false) = self.verify_tasks_complete() {
603                let open_tasks = self.get_open_task_list();
604                warn!(
605                    open_tasks = ?open_tasks,
606                    "Completion event with {} open task(s) - trusting agent decision",
607                    open_tasks.len()
608                );
609            }
610        } else if let Ok(false) = self.verify_scratchpad_complete() {
611            warn!("Completion event with pending scratchpad tasks - trusting agent decision");
612        }
613
614        info!("Completion event detected - terminating");
615
616        // Log loop terminated
617        self.diagnostics.log_orchestration(
618            self.state.iteration,
619            "loop",
620            crate::diagnostics::OrchestrationEvent::LoopTerminated {
621                reason: "completion_event".to_string(),
622            },
623        );
624
625        Some(TerminationReason::CompletionPromise)
626    }
627
628    /// Initializes the loop by publishing the start event.
629    pub fn initialize(&mut self, prompt_content: &str) {
630        // Use configured starting_event or default to task.start for backward compatibility
631        let topic = self
632            .config
633            .event_loop
634            .starting_event
635            .clone()
636            .unwrap_or_else(|| "task.start".to_string());
637        self.initialize_with_topic(&topic, prompt_content);
638    }
639
640    /// Initializes the loop for resume mode by publishing task.resume.
641    ///
642    /// Per spec: "User can run `ralph resume` to restart reading existing scratchpad."
643    /// The planner should read the existing scratchpad rather than doing fresh gap analysis.
644    pub fn initialize_resume(&mut self, prompt_content: &str) {
645        // Resume always uses task.resume regardless of starting_event config
646        self.initialize_with_topic("task.resume", prompt_content);
647    }
648
649    /// Common initialization logic with configurable topic.
650    fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
651        // Store the objective so it persists across all iterations.
652        // After iteration 1, bus.take_pending() consumes the start event,
653        // so without this the objective would be invisible to later hats.
654        self.ralph.set_objective(prompt_content.to_string());
655
656        let start_event = Event::new(topic, prompt_content);
657        self.bus.publish(start_event);
658        debug!(topic = topic, "Published {} event", topic);
659    }
660
661    /// Gets the next hat to execute (if any have pending events).
662    ///
663    /// Per "Hatless Ralph" architecture: When custom hats are defined, Ralph is
664    /// always the executor. Custom hats define topology (pub/sub contracts) that
665    /// Ralph uses for coordination context, but Ralph handles all iterations.
666    ///
667    /// - Solo mode (no custom hats): Returns "ralph" if Ralph has pending events
668    /// - Multi-hat mode (custom hats defined): Always returns "ralph" if ANY hat has pending events
669    pub fn next_hat(&self) -> Option<&HatId> {
670        let next = self.bus.next_hat_with_pending();
671
672        // If no pending hat events but human interactions are pending, route to Ralph.
673        if next.is_none() && self.bus.has_human_pending() {
674            return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
675        }
676
677        // If no pending events, return None
678        next.as_ref()?;
679
680        // In multi-hat mode, always route to Ralph (custom hats define topology only)
681        // Ralph's prompt includes the ## HATS section for coordination awareness
682        if self.registry.is_empty() {
683            // Solo mode - return the next hat (which is "ralph")
684            next
685        } else {
686            // Return "ralph" - the constant coordinator
687            // Find ralph in the bus's registered hats
688            self.bus.hat_ids().find(|id| id.as_str() == "ralph")
689        }
690    }
691
692    /// Checks if any hats have pending events.
693    ///
694    /// Use this after `process_output` to detect if the LLM failed to publish an event.
695    /// If false after processing, the loop will terminate on the next iteration.
696    pub fn has_pending_events(&self) -> bool {
697        self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
698    }
699
700    /// Checks if any pending events are human-related (human.response, human.guidance).
701    ///
702    /// Used to skip cooldown delays when a human event is next, since we don't
703    /// want to artificially delay the response to a human interaction.
704    pub fn has_pending_human_events(&self) -> bool {
705        self.bus.has_human_pending()
706    }
707
708    /// Returns whether unread JSONL events include any semantic `plan.*` topics.
709    ///
710    /// This allows callers to dispatch `pre.plan.created` hooks before
711    /// event publication handling without consuming unread events.
712    pub fn has_pending_plan_events_in_jsonl(&self) -> std::io::Result<bool> {
713        let result = self.event_reader.peek_new_events()?;
714        Ok(result
715            .events
716            .iter()
717            .any(|event| event.topic.starts_with("plan.")))
718    }
719
720    /// Returns structured context for the first unread `human.interact` event,
721    /// if one is present in JSONL without consuming reader state.
722    pub fn pending_human_interact_context_in_jsonl(&self) -> std::io::Result<Option<Value>> {
723        let result = self.event_reader.peek_new_events()?;
724        Ok(result
725            .events
726            .iter()
727            .find(|event| event.topic == "human.interact")
728            .map(|event| {
729                Self::parse_human_interact_context(event.payload.as_deref().unwrap_or_default())
730            }))
731    }
732
733    /// Gets the topics a hat is allowed to publish.
734    ///
735    /// Used to build retry prompts when the LLM forgets to publish an event.
736    pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
737        self.registry
738            .get(hat_id)
739            .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
740            .unwrap_or_default()
741    }
742
743    /// Injects a fallback event to recover from a stalled loop.
744    ///
745    /// When no hats have pending events (agent failed to publish), this method
746    /// injects a `task.resume` event which Ralph will handle to attempt recovery.
747    ///
748    /// Returns true if a fallback event was injected, false if recovery is not possible.
749    pub fn inject_fallback_event(&mut self) -> bool {
750        let fallback_event = Event::new(
751            "task.resume",
752            "RECOVERY: Previous iteration did not publish an event. \
753             Review the scratchpad and either dispatch the next task or complete the loop.",
754        );
755
756        // If a custom hat was last executing, target the fallback back to it
757        // This preserves hat context instead of always falling back to Ralph
758        let fallback_event = match &self.state.last_hat {
759            Some(hat_id) if hat_id.as_str() != "ralph" => {
760                debug!(
761                    hat = %hat_id.as_str(),
762                    "Injecting fallback event to recover - targeting last hat with task.resume"
763                );
764                fallback_event.with_target(hat_id.clone())
765            }
766            _ => {
767                debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
768                fallback_event
769            }
770        };
771
772        self.bus.publish(fallback_event);
773        true
774    }
775
776    /// Builds the prompt for a hat's execution.
777    ///
778    /// Per "Hatless Ralph" architecture:
779    /// - Solo mode: Ralph handles everything with his own prompt
780    /// - Multi-hat mode: Ralph is the sole executor, custom hats define topology only
781    ///
782    /// When in multi-hat mode, this method collects ALL pending events across all hats
783    /// and builds Ralph's prompt with that context. The `## HATS` section in Ralph's
784    /// prompt documents the topology for coordination awareness.
785    ///
786    /// If memories are configured with `inject: auto`, this method also prepends
787    /// primed memories to the prompt context. If a scratchpad file exists and is
788    /// non-empty, its content is also prepended (before memories).
789    pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
790        // Handle "ralph" hat - the constant coordinator
791        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
792        if hat_id.as_str() == "ralph" {
793            if self.registry.is_empty() {
794                // Solo mode - just Ralph's events, no hats to filter
795                let mut events = self.bus.take_pending(&hat_id.clone());
796                let mut human_events = self.bus.take_human_pending();
797                events.append(&mut human_events);
798
799                // Separate human.guidance events from regular events
800                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
801                    .into_iter()
802                    .partition(|e| e.topic.as_str() == "human.guidance");
803
804                let events_context = regular_events
805                    .iter()
806                    .map(|e| Self::format_event(e))
807                    .collect::<Vec<_>>()
808                    .join("\n");
809
810                // Persist and inject human guidance into prompt if present
811                self.update_robot_guidance(guidance_events);
812                self.apply_robot_guidance();
813
814                // Build base prompt and prepend memories + scratchpad + ready tasks
815                let base_prompt = self.ralph.build_prompt(&events_context, &[]);
816                self.ralph.clear_robot_guidance();
817                let with_skills = self.prepend_auto_inject_skills(base_prompt);
818                let with_scratchpad = self.prepend_scratchpad(with_skills);
819                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
820
821                debug!("build_prompt: routing to HatlessRalph (solo mode)");
822                return Some(final_prompt);
823            } else {
824                // Multi-hat mode: collect events and determine active hats
825                let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
826                // Deterministic ordering (avoid HashMap iteration order nondeterminism).
827                all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
828
829                let mut all_events = Vec::new();
830                let mut system_events = Vec::new();
831
832                for id in &all_hat_ids {
833                    let pending = self.bus.take_pending(id);
834                    if pending.is_empty() {
835                        continue;
836                    }
837
838                    let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
839                    if drop_pending {
840                        // Drop the pending events that would have activated the hat.
841                        if let Some(exhausted_event) = exhausted_event {
842                            all_events.push(exhausted_event.clone());
843                            system_events.push(exhausted_event);
844                        }
845                        continue;
846                    }
847
848                    all_events.extend(pending);
849                }
850
851                let mut human_events = self.bus.take_human_pending();
852                all_events.append(&mut human_events);
853
854                // Publish orchestrator-generated system events after consuming pending events,
855                // so they become visible in the event log and can be handled next iteration.
856                for event in system_events {
857                    self.bus.publish(event);
858                }
859
860                // Separate human.guidance events from regular events
861                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
862                    .into_iter()
863                    .partition(|e| e.topic.as_str() == "human.guidance");
864
865                // Persist and inject human guidance before building prompt (must happen before
866                // immutable borrows from determine_active_hats)
867                self.update_robot_guidance(guidance_events);
868                self.apply_robot_guidance();
869
870                // Determine which hats are active based on regular events
871                let active_hat_ids = self.determine_active_hat_ids(&regular_events);
872                self.record_hat_activations(&active_hat_ids);
873                self.state.last_active_hat_ids = active_hat_ids.clone();
874                let active_hats = self.determine_active_hats(&regular_events);
875
876                // Format events for context
877                let events_context = regular_events
878                    .iter()
879                    .map(|e| Self::format_event(e))
880                    .collect::<Vec<_>>()
881                    .join("\n");
882
883                // Build base prompt and prepend memories + scratchpad if available
884                let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
885
886                // Build prompt with active hats - filters instructions to only active hats
887                debug!(
888                    "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
889                    active_hats
890                        .iter()
891                        .map(|h| h.id.as_str())
892                        .collect::<Vec<_>>()
893                );
894
895                // Clear guidance after active_hats references are no longer needed
896                self.ralph.clear_robot_guidance();
897                let with_skills = self.prepend_auto_inject_skills(base_prompt);
898                let with_scratchpad = self.prepend_scratchpad(with_skills);
899                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
900
901                return Some(final_prompt);
902            }
903        }
904
905        // Non-ralph hat requested - this shouldn't happen in multi-hat mode since
906        // next_hat() always returns "ralph" when custom hats are defined.
907        // But we keep this code path for backward compatibility and tests.
908        let events = self.bus.take_pending(&hat_id.clone());
909        let events_context = events
910            .iter()
911            .map(|e| Self::format_event(e))
912            .collect::<Vec<_>>()
913            .join("\n");
914
915        let hat = self.registry.get(hat_id)?;
916
917        // Debug logging to trace hat routing
918        debug!(
919            "build_prompt: hat_id='{}', instructions.is_empty()={}",
920            hat_id.as_str(),
921            hat.instructions.is_empty()
922        );
923
924        // All hats use build_custom_hat with ghuntley-style prompts
925        debug!(
926            "build_prompt: routing to build_custom_hat() for '{}'",
927            hat_id.as_str()
928        );
929        Some(
930            self.instruction_builder
931                .build_custom_hat(hat, &events_context),
932        )
933    }
934
935    /// Stores guidance payloads, persists them to scratchpad, and prepares them for prompt injection.
936    ///
937    /// Guidance events are ephemeral in the event bus (consumed by `take_pending`).
938    /// This method both caches them in memory for prompt injection and appends
939    /// them to the scratchpad file so they survive across process restarts.
940    fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
941        if guidance_events.is_empty() {
942            return;
943        }
944
945        // Persist new guidance to scratchpad before caching
946        self.persist_guidance_to_scratchpad(&guidance_events);
947
948        self.robot_guidance
949            .extend(guidance_events.into_iter().map(|e| e.payload));
950    }
951
952    /// Appends human guidance entries to the scratchpad file for durability.
953    ///
954    /// Each guidance message is written as a timestamped markdown entry so it
955    /// appears alongside the agent's own thinking and survives process restarts.
956    fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
957        use std::io::Write;
958
959        let scratchpad_path = self.scratchpad_path();
960        let resolved_path = if scratchpad_path.is_relative() {
961            self.config.core.workspace_root.join(&scratchpad_path)
962        } else {
963            scratchpad_path
964        };
965
966        // Create parent directories if needed
967        if let Some(parent) = resolved_path.parent()
968            && !parent.exists()
969            && let Err(e) = std::fs::create_dir_all(parent)
970        {
971            warn!("Failed to create scratchpad directory: {}", e);
972            return;
973        }
974
975        let mut file = match std::fs::OpenOptions::new()
976            .create(true)
977            .append(true)
978            .open(&resolved_path)
979        {
980            Ok(f) => f,
981            Err(e) => {
982                warn!("Failed to open scratchpad for guidance persistence: {}", e);
983                return;
984            }
985        };
986
987        let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
988        for event in guidance_events {
989            let entry = format!(
990                "\n### HUMAN GUIDANCE ({})\n\n{}\n",
991                timestamp, event.payload
992            );
993            if let Err(e) = file.write_all(entry.as_bytes()) {
994                warn!("Failed to write guidance to scratchpad: {}", e);
995            }
996        }
997
998        info!(
999            count = guidance_events.len(),
1000            "Persisted human guidance to scratchpad"
1001        );
1002    }
1003
1004    /// Injects cached guidance into the next prompt build.
1005    fn apply_robot_guidance(&mut self) {
1006        if self.robot_guidance.is_empty() {
1007            return;
1008        }
1009
1010        self.ralph.set_robot_guidance(self.robot_guidance.clone());
1011    }
1012
1013    /// Prepends auto-injected skill content to the prompt.
1014    ///
1015    /// This generalizes the former `prepend_memories()` into a skill auto-injection
1016    /// pipeline that handles memories, tools, and any other auto-inject skills.
1017    ///
1018    /// Injection order:
1019    /// 1. Memory data + ralph-tools skill (special case: loads memory data from store, applies budget)
1020    /// 2. RObot interaction skill (gated by `robot.enabled`)
1021    /// 3. Other auto-inject skills from the registry (wrapped in XML tags)
1022    fn prepend_auto_inject_skills(&self, prompt: String) -> String {
1023        let mut prefix = String::new();
1024
1025        // 1. Memory data + ralph-tools skill — special case with data loading
1026        self.inject_memories_and_tools_skill(&mut prefix);
1027
1028        // 2. RObot interaction skill — gated by robot.enabled
1029        self.inject_robot_skill(&mut prefix);
1030
1031        // 3. Other auto-inject skills from the registry
1032        self.inject_custom_auto_skills(&mut prefix);
1033
1034        if prefix.is_empty() {
1035            return prompt;
1036        }
1037
1038        prefix.push_str("\n\n");
1039        prefix.push_str(&prompt);
1040        prefix
1041    }
1042
1043    /// Injects memory data and the ralph-tools skill into the prefix.
1044    ///
1045    /// Special case: loads memory entries from the store, applies budget
1046    /// truncation, then appends the ralph-tools skill content (which covers
1047    /// both tasks and memories CLI usage).
1048    /// Memory data is gated by `memories.enabled && memories.inject == Auto`.
1049    /// The ralph-tools skill is injected when either memories or tasks are enabled.
1050    fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
1051        let memories_config = &self.config.memories;
1052
1053        // Inject memory DATA if memories are enabled with auto-inject
1054        if memories_config.enabled && memories_config.inject == InjectMode::Auto {
1055            info!(
1056                "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
1057                memories_config.enabled, memories_config.inject, self.config.core.workspace_root
1058            );
1059
1060            let workspace_root = &self.config.core.workspace_root;
1061            let store = MarkdownMemoryStore::with_default_path(workspace_root);
1062            let memories_path = workspace_root.join(".ralph/agent/memories.md");
1063
1064            info!(
1065                "Looking for memories at: {:?} (exists: {})",
1066                memories_path,
1067                memories_path.exists()
1068            );
1069
1070            let memories = match store.load() {
1071                Ok(memories) => {
1072                    info!("Successfully loaded {} memories from store", memories.len());
1073                    memories
1074                }
1075                Err(e) => {
1076                    info!(
1077                        "Failed to load memories for injection: {} (path: {:?})",
1078                        e, memories_path
1079                    );
1080                    Vec::new()
1081                }
1082            };
1083
1084            if memories.is_empty() {
1085                info!("Memory store is empty - no memories to inject");
1086            } else {
1087                let mut memories_content = format_memories_as_markdown(&memories);
1088
1089                if memories_config.budget > 0 {
1090                    let original_len = memories_content.len();
1091                    memories_content =
1092                        truncate_to_budget(&memories_content, memories_config.budget);
1093                    debug!(
1094                        "Applied budget: {} chars -> {} chars (budget: {})",
1095                        original_len,
1096                        memories_content.len(),
1097                        memories_config.budget
1098                    );
1099                }
1100
1101                info!(
1102                    "Injecting {} memories ({} chars) into prompt",
1103                    memories.len(),
1104                    memories_content.len()
1105                );
1106
1107                prefix.push_str(&memories_content);
1108            }
1109        }
1110
1111        // Inject the ralph-tools skill when either memories or tasks are enabled
1112        if memories_config.enabled || self.config.tasks.enabled {
1113            if let Some(skill) = self.skill_registry.get("ralph-tools") {
1114                if !prefix.is_empty() {
1115                    prefix.push_str("\n\n");
1116                }
1117                prefix.push_str(&format!(
1118                    "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1119                    skill.content.trim()
1120                ));
1121                debug!("Injected ralph-tools skill from registry");
1122            } else {
1123                debug!("ralph-tools skill not found in registry - skill content not injected");
1124            }
1125        }
1126    }
1127
1128    /// Injects the RObot interaction skill content into the prefix.
1129    ///
1130    /// Gated by `robot.enabled`. Teaches agents how and when to interact
1131    /// with humans via `human.interact` events.
1132    fn inject_robot_skill(&self, prefix: &mut String) {
1133        if !self.config.robot.enabled {
1134            return;
1135        }
1136
1137        if let Some(skill) = self.skill_registry.get("robot-interaction") {
1138            if !prefix.is_empty() {
1139                prefix.push_str("\n\n");
1140            }
1141            prefix.push_str(&format!(
1142                "<robot-skill>\n{}\n</robot-skill>",
1143                skill.content.trim()
1144            ));
1145            debug!("Injected robot interaction skill from registry");
1146        }
1147    }
1148
1149    /// Injects any user-configured auto-inject skills (excluding built-in ralph-tools/robot-interaction).
1150    fn inject_custom_auto_skills(&self, prefix: &mut String) {
1151        for skill in self.skill_registry.auto_inject_skills(None) {
1152            // Skip built-in skills handled above
1153            if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1154                continue;
1155            }
1156
1157            if !prefix.is_empty() {
1158                prefix.push_str("\n\n");
1159            }
1160            prefix.push_str(&format!(
1161                "<{name}-skill>\n{content}\n</{name}-skill>",
1162                name = skill.name,
1163                content = skill.content.trim()
1164            ));
1165            debug!("Injected auto-inject skill: {}", skill.name);
1166        }
1167    }
1168
1169    /// Prepends scratchpad content to the prompt if the file exists and is non-empty.
1170    ///
1171    /// The scratchpad is the agent's working memory for the current objective.
1172    /// Auto-injecting saves one tool call per iteration.
1173    /// When the file exceeds the budget, the TAIL is kept (most recent entries).
1174    fn prepend_scratchpad(&self, prompt: String) -> String {
1175        let scratchpad_path = self.scratchpad_path();
1176
1177        let resolved_path = if scratchpad_path.is_relative() {
1178            self.config.core.workspace_root.join(&scratchpad_path)
1179        } else {
1180            scratchpad_path
1181        };
1182
1183        if !resolved_path.exists() {
1184            debug!(
1185                "Scratchpad not found at {:?}, skipping injection",
1186                resolved_path
1187            );
1188            return prompt;
1189        }
1190
1191        let content = match std::fs::read_to_string(&resolved_path) {
1192            Ok(c) => c,
1193            Err(e) => {
1194                info!("Failed to read scratchpad for injection: {}", e);
1195                return prompt;
1196            }
1197        };
1198
1199        if content.trim().is_empty() {
1200            debug!("Scratchpad is empty, skipping injection");
1201            return prompt;
1202        }
1203
1204        // Budget: 4000 tokens ~16000 chars. Keep the TAIL (most recent content).
1205        let char_budget = 4000 * 4;
1206        let content = if content.len() > char_budget {
1207            // Find a line boundary near the start of the tail
1208            let start = content.len() - char_budget;
1209            // Ensure we start at a valid UTF-8 character boundary
1210            let start = floor_char_boundary(&content, start);
1211            let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1212            let discarded = &content[..line_start];
1213
1214            // Summarize discarded content by extracting markdown headings
1215            let headings: Vec<&str> = discarded
1216                .lines()
1217                .filter(|line| line.starts_with('#'))
1218                .collect();
1219            let summary = if headings.is_empty() {
1220                format!(
1221                    "<!-- earlier content truncated ({} chars omitted) -->",
1222                    line_start
1223                )
1224            } else {
1225                format!(
1226                    "<!-- earlier content truncated ({} chars omitted) -->\n\
1227                     <!-- discarded sections: {} -->",
1228                    line_start,
1229                    headings.join(" | ")
1230                )
1231            };
1232
1233            format!("{}\n\n{}", summary, &content[line_start..])
1234        } else {
1235            content
1236        };
1237
1238        info!("Injecting scratchpad ({} chars) into prompt", content.len());
1239
1240        let mut final_prompt = format!(
1241            "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1242            self.config.core.scratchpad, content
1243        );
1244        final_prompt.push_str(&prompt);
1245        final_prompt
1246    }
1247
1248    /// Prepends ready tasks to the prompt if tasks are enabled and any exist.
1249    ///
1250    /// Loads the task store and formats ready (unblocked, open) tasks into
1251    /// a `<ready-tasks>` XML block. This saves the agent a tool call per
1252    /// iteration and puts tasks at the same prominence as the scratchpad.
1253    fn prepend_ready_tasks(&self, prompt: String) -> String {
1254        if !self.config.tasks.enabled {
1255            return prompt;
1256        }
1257
1258        use crate::task::TaskStatus;
1259        use crate::task_store::TaskStore;
1260
1261        let tasks_path = self.tasks_path();
1262        let resolved_path = if tasks_path.is_relative() {
1263            self.config.core.workspace_root.join(&tasks_path)
1264        } else {
1265            tasks_path
1266        };
1267
1268        if !resolved_path.exists() {
1269            return prompt;
1270        }
1271
1272        let store = match TaskStore::load(&resolved_path) {
1273            Ok(s) => s,
1274            Err(e) => {
1275                info!("Failed to load task store for injection: {}", e);
1276                return prompt;
1277            }
1278        };
1279
1280        let ready = store.ready();
1281        let open = store.open();
1282        let closed_count = store.all().len() - open.len();
1283
1284        if open.is_empty() && closed_count == 0 {
1285            return prompt;
1286        }
1287
1288        let mut section = String::from("<ready-tasks>\n");
1289        if ready.is_empty() && open.is_empty() {
1290            section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1291        } else {
1292            section.push_str(&format!(
1293                "## Tasks: {} ready, {} open, {} closed\n\n",
1294                ready.len(),
1295                open.len(),
1296                closed_count
1297            ));
1298            for task in &ready {
1299                let status_icon = match task.status {
1300                    TaskStatus::Open => "[ ]",
1301                    TaskStatus::InProgress => "[~]",
1302                    _ => "[?]",
1303                };
1304                section.push_str(&format!(
1305                    "- {} [P{}] {} ({})\n",
1306                    status_icon, task.priority, task.title, task.id
1307                ));
1308            }
1309            // Show blocked tasks separately so agent knows they exist
1310            let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1311            let blocked: Vec<_> = open
1312                .iter()
1313                .filter(|t| !ready_ids.contains(&t.id.as_str()))
1314                .collect();
1315            if !blocked.is_empty() {
1316                section.push_str("\nBlocked:\n");
1317                for task in blocked {
1318                    section.push_str(&format!(
1319                        "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1320                        task.priority,
1321                        task.title,
1322                        task.id,
1323                        task.blocked_by.join(", ")
1324                    ));
1325                }
1326            }
1327        }
1328        section.push_str("</ready-tasks>\n\n");
1329
1330        info!(
1331            "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1332            ready.len(),
1333            open.len(),
1334            closed_count
1335        );
1336
1337        let mut final_prompt = section;
1338        final_prompt.push_str(&prompt);
1339        final_prompt
1340    }
1341
1342    /// Builds the Ralph prompt (coordination mode).
1343    pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1344        self.ralph.build_prompt(prompt_content, &[])
1345    }
1346
1347    /// Determines which hats should be active based on pending events.
1348    /// Returns list of Hat references that are triggered by any pending event.
1349    fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1350        let mut active_hats = Vec::new();
1351        for id in self.determine_active_hat_ids(events) {
1352            if let Some(hat) = self.registry.get(&id) {
1353                active_hats.push(hat);
1354            }
1355        }
1356        active_hats
1357    }
1358
1359    fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1360        let mut active_hat_ids = Vec::new();
1361        for event in events {
1362            if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1363                // Avoid duplicates
1364                if !active_hat_ids.iter().any(|id| id == &hat.id) {
1365                    active_hat_ids.push(hat.id.clone());
1366                }
1367            }
1368        }
1369        active_hat_ids
1370    }
1371
1372    /// Formats an event for prompt context.
1373    ///
1374    /// For top-level prompts (task.start, task.resume), wraps the payload in
1375    /// `<top-level-prompt>` XML tags to clearly delineate the user's original request.
1376    fn format_event(event: &Event) -> String {
1377        let topic = &event.topic;
1378        let payload = &event.payload;
1379
1380        if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1381            format!(
1382                "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1383                topic, payload
1384            )
1385        } else {
1386            format!("Event: {} - {}", topic, payload)
1387        }
1388    }
1389
1390    fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1391        let Some(config) = self.registry.get_config(hat_id) else {
1392            return (false, None);
1393        };
1394        let Some(max) = config.max_activations else {
1395            return (false, None);
1396        };
1397
1398        let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1399        if count < max {
1400            return (false, None);
1401        }
1402
1403        // Emit only once per hat per run (avoid flooding).
1404        let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1405
1406        if !should_emit {
1407            // Hat is already exhausted - drop pending events silently.
1408            return (true, None);
1409        }
1410
1411        let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1412        dropped_topics.sort();
1413
1414        let payload = format!(
1415            "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n  - {topics}",
1416            hat = hat_id.as_str(),
1417            max = max,
1418            count = count,
1419            topics = dropped_topics.join("\n  - ")
1420        );
1421
1422        warn!(
1423            hat = %hat_id.as_str(),
1424            max_activations = max,
1425            activations = count,
1426            "Hat exhausted (max_activations reached)"
1427        );
1428
1429        (
1430            true,
1431            Some(Event::new(
1432                format!("{}.exhausted", hat_id.as_str()),
1433                payload,
1434            )),
1435        )
1436    }
1437
1438    fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1439        for hat_id in active_hat_ids {
1440            *self
1441                .state
1442                .hat_activation_counts
1443                .entry(hat_id.clone())
1444                .or_insert(0) += 1;
1445        }
1446    }
1447
1448    /// Returns the primary active hat ID for display purposes.
1449    /// Returns the first active hat, or "ralph" if no specific hat is active.
1450    /// BTreeMap iteration is already sorted by key.
1451    pub fn get_active_hat_id(&self) -> HatId {
1452        // Peek at pending events (don't consume them)
1453        for hat_id in self.bus.hat_ids() {
1454            let Some(events) = self.bus.peek_pending(hat_id) else {
1455                continue;
1456            };
1457            let Some(event) = events.first() else {
1458                continue;
1459            };
1460            if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1461                return active_hat.id.clone();
1462            }
1463        }
1464        HatId::new("ralph")
1465    }
1466
1467    /// Injects a default event for a hat when the agent wrote no events.
1468    ///
1469    /// Call this after `process_events_from_jsonl` returns `Ok(false)` (no events found).
1470    /// If the hat has `default_publishes` configured, this injects the default event.
1471    ///
1472    /// If the default topic matches the completion promise, `completion_requested` is set
1473    /// so the loop can terminate. Without this, completion events injected via
1474    /// `default_publishes` would only be published to the bus (triggering downstream hats)
1475    /// but never detected by `check_completion_event`, causing an infinite loop.
1476    pub fn check_default_publishes(&mut self, hat_id: &HatId) {
1477        if let Some(config) = self.registry.get_config(hat_id)
1478            && let Some(default_topic) = &config.default_publishes
1479        {
1480            let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1481
1482            debug!(
1483                hat = %hat_id.as_str(),
1484                topic = %default_topic,
1485                "No events written by hat, injecting default_publishes event"
1486            );
1487
1488            self.state.record_topic(default_topic.as_str());
1489
1490            // If the default topic is the completion promise, set the flag directly.
1491            // The normal path (process_events_from_jsonl) sets this when reading from
1492            // JSONL, but default_publishes bypasses JSONL entirely.
1493            if default_topic.as_str() == self.config.event_loop.completion_promise {
1494                info!(
1495                    hat = %hat_id.as_str(),
1496                    topic = %default_topic,
1497                    "default_publishes matches completion_promise — requesting termination"
1498                );
1499                self.state.completion_requested = true;
1500            }
1501
1502            self.bus.publish(default_event);
1503        }
1504    }
1505
1506    /// Returns a mutable reference to the event bus for direct event publishing.
1507    ///
1508    /// This is primarily used for planning sessions to inject user responses
1509    /// as events into the orchestration loop.
1510    pub fn bus(&mut self) -> &mut EventBus {
1511        &mut self.bus
1512    }
1513
1514    /// Processes output from a hat execution.
1515    ///
1516    /// Returns the termination reason if the loop should stop.
1517    pub fn process_output(
1518        &mut self,
1519        hat_id: &HatId,
1520        output: &str,
1521        success: bool,
1522    ) -> Option<TerminationReason> {
1523        self.state.iteration += 1;
1524        self.state.last_hat = Some(hat_id.clone());
1525
1526        // Periodic robot check-in
1527        if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1528            && let Some(ref robot_service) = self.robot_service
1529        {
1530            let elapsed = self.state.elapsed();
1531            let interval = std::time::Duration::from_secs(interval_secs);
1532            let last = self
1533                .state
1534                .last_checkin_at
1535                .map(|t| t.elapsed())
1536                .unwrap_or(elapsed);
1537
1538            if last >= interval {
1539                let context = self.build_checkin_context(hat_id);
1540                match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1541                    Ok(_) => {
1542                        self.state.last_checkin_at = Some(std::time::Instant::now());
1543                        debug!(iteration = self.state.iteration, "Sent robot check-in");
1544                    }
1545                    Err(e) => {
1546                        warn!(error = %e, "Failed to send robot check-in");
1547                    }
1548                }
1549            }
1550        }
1551
1552        // Log iteration started
1553        self.diagnostics.log_orchestration(
1554            self.state.iteration,
1555            "loop",
1556            crate::diagnostics::OrchestrationEvent::IterationStarted,
1557        );
1558
1559        // Log hat selected
1560        self.diagnostics.log_orchestration(
1561            self.state.iteration,
1562            "loop",
1563            crate::diagnostics::OrchestrationEvent::HatSelected {
1564                hat: hat_id.to_string(),
1565                reason: "process_output".to_string(),
1566            },
1567        );
1568
1569        // Track failures
1570        if success {
1571            self.state.consecutive_failures = 0;
1572        } else {
1573            self.state.consecutive_failures += 1;
1574        }
1575
1576        let _ = output;
1577
1578        // File-modification audit: detect when a hat with disallowed Edit/Write tools
1579        // modified files. This is hard enforcement — emits a scope_violation event.
1580        self.audit_file_modifications(hat_id);
1581
1582        // Events are ONLY read from the JSONL file written by `ralph emit`.
1583        // This enforces tool use and prevents confabulation (agent claiming to emit without actually doing so).
1584        // See process_events_from_jsonl() for event processing.
1585
1586        // Check termination conditions
1587        self.check_termination()
1588    }
1589
1590    /// Audits file modifications after a hat iteration.
1591    ///
1592    /// If the hat has `Edit` or `Write` in its `disallowed_tools`, checks whether
1593    /// files were modified (via `git diff --stat HEAD`). If so, emits a
1594    /// `<hat_id>.scope_violation` event.
1595    fn audit_file_modifications(&mut self, hat_id: &HatId) {
1596        let config = match self.registry.get_config(hat_id) {
1597            Some(c) => c,
1598            None => return,
1599        };
1600
1601        let has_write_restriction = config
1602            .disallowed_tools
1603            .iter()
1604            .any(|t| t == "Edit" || t == "Write");
1605
1606        if !has_write_restriction {
1607            return;
1608        }
1609
1610        let workspace = &self.config.core.workspace_root;
1611        let diff_output = std::process::Command::new("git")
1612            .args(["diff", "--stat", "HEAD"])
1613            .current_dir(workspace)
1614            .output();
1615
1616        match diff_output {
1617            Ok(output) if !output.stdout.is_empty() => {
1618                let diff_stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
1619                warn!(
1620                    hat = %hat_id.as_str(),
1621                    diff = %diff_stat,
1622                    "Hat modified files despite tool restrictions (scope violation)"
1623                );
1624
1625                let violation_topic = format!("{}.scope_violation", hat_id.as_str());
1626                let violation = Event::new(
1627                    violation_topic.as_str(),
1628                    format!(
1629                        "Hat '{}' modified files with Edit/Write disallowed:\n{}",
1630                        hat_id.as_str(),
1631                        diff_stat
1632                    ),
1633                );
1634                self.bus.publish(violation);
1635            }
1636            Err(e) => {
1637                debug!(error = %e, "Could not run git diff for file-modification audit");
1638            }
1639            _ => {} // No modifications — all good
1640        }
1641    }
1642
1643    /// Extracts task identifier from build.blocked payload.
1644    /// Uses first line of payload as task ID.
1645    fn extract_task_id(payload: &str) -> String {
1646        payload
1647            .lines()
1648            .next()
1649            .unwrap_or("unknown")
1650            .trim()
1651            .to_string()
1652    }
1653
1654    /// Adds cost to the cumulative total.
1655    pub fn add_cost(&mut self, cost: f64) {
1656        self.state.cumulative_cost += cost;
1657    }
1658
1659    /// Verifies all tasks in scratchpad are complete or cancelled.
1660    ///
1661    /// Returns:
1662    /// - `Ok(true)` if all tasks are `[x]` or `[~]`
1663    /// - `Ok(false)` if any tasks are `[ ]` (pending)
1664    /// - `Err(...)` if scratchpad doesn't exist or can't be read
1665    fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1666        let scratchpad_path = self.scratchpad_path();
1667
1668        if !scratchpad_path.exists() {
1669            return Err(std::io::Error::new(
1670                std::io::ErrorKind::NotFound,
1671                "Scratchpad does not exist",
1672            ));
1673        }
1674
1675        let content = std::fs::read_to_string(scratchpad_path)?;
1676
1677        let has_pending = content
1678            .lines()
1679            .any(|line| line.trim_start().starts_with("- [ ]"));
1680
1681        Ok(!has_pending)
1682    }
1683
1684    fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1685        use crate::task_store::TaskStore;
1686
1687        let tasks_path = self.tasks_path();
1688
1689        // No tasks file = no pending tasks = complete
1690        if !tasks_path.exists() {
1691            return Ok(true);
1692        }
1693
1694        let store = TaskStore::load(&tasks_path)?;
1695        Ok(!store.has_pending_tasks())
1696    }
1697
1698    /// Builds a [`CheckinContext`] with current loop state for robot check-ins.
1699    fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
1700        let (open_tasks, closed_tasks) = self.count_tasks();
1701        CheckinContext {
1702            current_hat: Some(hat_id.as_str().to_string()),
1703            open_tasks,
1704            closed_tasks,
1705            cumulative_cost: self.state.cumulative_cost,
1706        }
1707    }
1708
1709    /// Counts open and closed tasks from the task store.
1710    ///
1711    /// Returns `(open_count, closed_count)`. "Open" means non-terminal tasks,
1712    /// "closed" means tasks with `TaskStatus::Closed`.
1713    fn count_tasks(&self) -> (usize, usize) {
1714        use crate::task::TaskStatus;
1715        use crate::task_store::TaskStore;
1716
1717        let tasks_path = self.tasks_path();
1718        if !tasks_path.exists() {
1719            return (0, 0);
1720        }
1721
1722        match TaskStore::load(&tasks_path) {
1723            Ok(store) => {
1724                let total = store.all().len();
1725                let open = store.open().len();
1726                let closed = total - open;
1727                // Verify: closed should match Closed status count
1728                debug_assert_eq!(
1729                    closed,
1730                    store
1731                        .all()
1732                        .iter()
1733                        .filter(|t| t.status == TaskStatus::Closed)
1734                        .count()
1735                );
1736                (open, closed)
1737            }
1738            Err(_) => (0, 0),
1739        }
1740    }
1741
1742    /// Returns a list of open task descriptions for logging purposes.
1743    fn get_open_task_list(&self) -> Vec<String> {
1744        use crate::task_store::TaskStore;
1745
1746        let tasks_path = self.tasks_path();
1747        if let Ok(store) = TaskStore::load(&tasks_path) {
1748            return store
1749                .open()
1750                .iter()
1751                .map(|t| format!("{}: {}", t.id, t.title))
1752                .collect();
1753        }
1754        vec![]
1755    }
1756
1757    fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
1758        let threshold = self.config.event_loop.mutation_score_warn_threshold;
1759
1760        match &evidence.mutants {
1761            Some(mutants) => {
1762                if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
1763                    warn!(
1764                        reason = %reason,
1765                        mutants_status = ?mutants.status,
1766                        mutants_score = mutants.score_percent,
1767                        mutants_threshold = threshold,
1768                        "Mutation testing warning"
1769                    );
1770                }
1771            }
1772            None => {
1773                if let Some(threshold) = threshold {
1774                    warn!(
1775                        mutants_threshold = threshold,
1776                        "Mutation testing warning: missing mutation evidence in build.done payload"
1777                    );
1778                }
1779            }
1780        }
1781    }
1782
1783    fn mutation_warning_reason(
1784        mutants: &MutationEvidence,
1785        threshold: Option<f64>,
1786    ) -> Option<String> {
1787        match mutants.status {
1788            MutationStatus::Fail => Some("mutation testing failed".to_string()),
1789            MutationStatus::Warn => Some(Self::format_mutation_message(
1790                "mutation score below threshold",
1791                mutants.score_percent,
1792            )),
1793            MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
1794            MutationStatus::Pass => {
1795                let threshold = threshold?;
1796
1797                match mutants.score_percent {
1798                    Some(score) if score < threshold => Some(format!(
1799                        "mutation score {:.2}% below threshold {:.2}%",
1800                        score, threshold
1801                    )),
1802                    Some(_) => None,
1803                    None => Some(format!(
1804                        "mutation score missing (threshold {:.2}%)",
1805                        threshold
1806                    )),
1807                }
1808            }
1809        }
1810    }
1811
1812    fn format_mutation_message(message: &str, score: Option<f64>) -> String {
1813        match score {
1814            Some(score) => format!("{message} ({score:.2}%)"),
1815            None => message.to_string(),
1816        }
1817    }
1818
1819    fn parse_human_interact_context(payload: &str) -> Value {
1820        let mut context = match serde_json::from_str::<Value>(payload) {
1821            Ok(Value::Object(map)) => map,
1822            Ok(value) => {
1823                let mut map = Map::new();
1824                map.insert("question".to_string(), value);
1825                map
1826            }
1827            Err(_) => {
1828                let mut map = Map::new();
1829                map.insert("question".to_string(), Value::String(payload.to_string()));
1830                map
1831            }
1832        };
1833
1834        if !context.contains_key("question") {
1835            context.insert("question".to_string(), Value::String(payload.to_string()));
1836        }
1837
1838        Value::Object(context)
1839    }
1840
1841    /// Processes events from JSONL and routes orphaned events to Ralph.
1842    ///
1843    /// Also handles backpressure for malformed JSONL lines by:
1844    /// 1. Emitting `event.malformed` system events for each parse failure
1845    /// 2. Tracking consecutive failures for termination check
1846    /// 3. Resetting counter when valid events are parsed
1847    ///
1848    /// Returns [`ProcessedEvents`] indicating whether events were found, whether
1849    /// semantic `plan.*` topics were published, structured `human.interact`
1850    /// context/outcome metadata, and whether any were orphans that Ralph should
1851    /// handle.
1852    pub fn process_events_from_jsonl(&mut self) -> std::io::Result<ProcessedEvents> {
1853        let result = self.event_reader.read_new_events()?;
1854
1855        // Handle malformed lines with backpressure
1856        for malformed in &result.malformed {
1857            let payload = format!(
1858                "Line {}: {}\nContent: {}",
1859                malformed.line_number, malformed.error, &malformed.content
1860            );
1861            let event = Event::new("event.malformed", &payload);
1862            self.bus.publish(event);
1863            self.state.consecutive_malformed_events += 1;
1864            warn!(
1865                line = malformed.line_number,
1866                consecutive = self.state.consecutive_malformed_events,
1867                "Malformed event line detected"
1868            );
1869        }
1870
1871        // Reset counter when valid events are parsed
1872        if !result.events.is_empty() {
1873            self.state.consecutive_malformed_events = 0;
1874        }
1875
1876        if result.events.is_empty() && result.malformed.is_empty() {
1877            return Ok(ProcessedEvents {
1878                had_events: false,
1879                had_plan_events: false,
1880                human_interact_context: None,
1881                has_orphans: false,
1882            });
1883        }
1884
1885        // --- Scope enforcement: filter events against active hat's publishes ---
1886        // Only active when enforce_hat_scope is true in config (opt-in).
1887        let events = if self.config.event_loop.enforce_hat_scope {
1888            let active_hats = self.state.last_active_hat_ids.clone();
1889            let (in_scope, out_of_scope): (Vec<_>, Vec<_>) =
1890                result.events.into_iter().partition(|event| {
1891                    if active_hats.is_empty() {
1892                        return true; // Ralph coordinating — no scope restriction
1893                    }
1894                    active_hats
1895                        .iter()
1896                        .any(|hat_id| self.registry.can_publish(hat_id, event.topic.as_str()))
1897                });
1898
1899            for event in &out_of_scope {
1900                let violation_hat = active_hats.first().map(|h| h.as_str()).unwrap_or("unknown");
1901                warn!(
1902                    active_hats = ?active_hats,
1903                    topic = %event.topic,
1904                    "Scope violation: active hat(s) cannot publish this topic — dropping event"
1905                );
1906                let violation_topic = format!("{}.scope_violation", violation_hat);
1907                let violation_payload = format!(
1908                    "Attempted to publish '{}': {}",
1909                    event.topic,
1910                    event.payload.clone().unwrap_or_default()
1911                );
1912                let violation = Event::new(violation_topic, violation_payload);
1913                self.bus.publish(violation);
1914            }
1915
1916            in_scope
1917        } else {
1918            result.events
1919        };
1920        // --- End scope enforcement ---
1921
1922        let mut has_orphans = false;
1923
1924        // Validate and transform events (apply backpressure for build.done)
1925        let mut validated_events = Vec::new();
1926        let completion_topic = self.config.event_loop.completion_promise.as_str();
1927        let cancellation_topic = self.config.event_loop.cancellation_promise.clone();
1928        let total_events = events.len();
1929        for (index, event) in events.into_iter().enumerate() {
1930            let payload = event.payload.clone().unwrap_or_default();
1931
1932            // Detect loop.cancel — unconditional graceful termination
1933            if !cancellation_topic.is_empty() && event.topic.as_str() == cancellation_topic {
1934                info!(
1935                    payload = %payload,
1936                    "loop.cancel event detected — scheduling graceful termination"
1937                );
1938                self.state.cancellation_requested = true;
1939                // Continue processing remaining events (they may contain cleanup info)
1940                continue;
1941            }
1942
1943            if event.topic == completion_topic {
1944                if index + 1 == total_events {
1945                    self.state.completion_requested = true;
1946                    self.diagnostics.log_orchestration(
1947                        self.state.iteration,
1948                        "jsonl",
1949                        crate::diagnostics::OrchestrationEvent::EventPublished {
1950                            topic: event.topic.clone(),
1951                        },
1952                    );
1953                    info!(
1954                        topic = %event.topic,
1955                        "Completion event detected in JSONL"
1956                    );
1957                } else {
1958                    warn!(
1959                        topic = %event.topic,
1960                        index = index,
1961                        total_events = total_events,
1962                        "Completion event ignored because it was not the last event"
1963                    );
1964                }
1965                continue;
1966            }
1967
1968            if event.topic == "build.done" {
1969                // Validate build.done events have backpressure evidence
1970                if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1971                    if evidence.all_passed() {
1972                        self.warn_on_mutation_evidence(&evidence);
1973                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1974                    } else {
1975                        // Evidence present but checks failed - synthesize build.blocked
1976                        warn!(
1977                            tests = evidence.tests_passed,
1978                            lint = evidence.lint_passed,
1979                            typecheck = evidence.typecheck_passed,
1980                            audit = evidence.audit_passed,
1981                            coverage = evidence.coverage_passed,
1982                            complexity = evidence.complexity_score,
1983                            duplication = evidence.duplication_passed,
1984                            performance = evidence.performance_regression,
1985                            specs = evidence.specs_verified,
1986                            "build.done rejected: backpressure checks failed"
1987                        );
1988
1989                        let complexity = evidence
1990                            .complexity_score
1991                            .map(|value| format!("{value:.2}"))
1992                            .unwrap_or_else(|| "missing".to_string());
1993                        let performance = match evidence.performance_regression {
1994                            Some(true) => "regression".to_string(),
1995                            Some(false) => "pass".to_string(),
1996                            None => "missing".to_string(),
1997                        };
1998                        let specs = match evidence.specs_verified {
1999                            Some(true) => "pass".to_string(),
2000                            Some(false) => "fail".to_string(),
2001                            None => "not reported".to_string(),
2002                        };
2003
2004                        self.diagnostics.log_orchestration(
2005                            self.state.iteration,
2006                            "jsonl",
2007                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2008                                reason: format!(
2009                                    "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
2010                                    evidence.tests_passed,
2011                                    evidence.lint_passed,
2012                                    evidence.typecheck_passed,
2013                                    evidence.audit_passed,
2014                                    evidence.coverage_passed,
2015                                    complexity,
2016                                    evidence.duplication_passed,
2017                                    performance,
2018                                    specs
2019                                ),
2020                            },
2021                        );
2022
2023                        validated_events.push(Event::new(
2024                            "build.blocked",
2025                            "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
2026                        ));
2027                    }
2028                } else {
2029                    // No evidence found - synthesize build.blocked
2030                    warn!("build.done rejected: missing backpressure evidence");
2031
2032                    self.diagnostics.log_orchestration(
2033                        self.state.iteration,
2034                        "jsonl",
2035                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2036                            reason: "missing backpressure evidence".to_string(),
2037                        },
2038                    );
2039
2040                    validated_events.push(Event::new(
2041                        "build.blocked",
2042                        "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
2043                    ));
2044                }
2045            } else if event.topic == "review.done" {
2046                // Validate review.done events have verification evidence
2047                if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
2048                    if evidence.is_verified() {
2049                        validated_events.push(Event::new(event.topic.as_str(), &payload));
2050                    } else {
2051                        // Evidence present but checks failed - synthesize review.blocked
2052                        warn!(
2053                            tests = evidence.tests_passed,
2054                            build = evidence.build_passed,
2055                            "review.done rejected: verification checks failed"
2056                        );
2057
2058                        self.diagnostics.log_orchestration(
2059                            self.state.iteration,
2060                            "jsonl",
2061                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2062                                reason: format!(
2063                                    "review verification failed: tests={}, build={}",
2064                                    evidence.tests_passed, evidence.build_passed
2065                                ),
2066                            },
2067                        );
2068
2069                        validated_events.push(Event::new(
2070                            "review.blocked",
2071                            "Review verification failed. Run tests and build before emitting review.done.",
2072                        ));
2073                    }
2074                } else {
2075                    // No evidence found - synthesize review.blocked
2076                    warn!("review.done rejected: missing verification evidence");
2077
2078                    self.diagnostics.log_orchestration(
2079                        self.state.iteration,
2080                        "jsonl",
2081                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2082                            reason: "missing review verification evidence".to_string(),
2083                        },
2084                    );
2085
2086                    validated_events.push(Event::new(
2087                        "review.blocked",
2088                        "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
2089                    ));
2090                }
2091            } else if event.topic == "verify.passed" {
2092                if let Some(report) = EventParser::parse_quality_report(&payload) {
2093                    if report.meets_thresholds() {
2094                        validated_events.push(Event::new(event.topic.as_str(), &payload));
2095                    } else {
2096                        let failed = report.failed_dimensions();
2097                        let reason = if failed.is_empty() {
2098                            "quality thresholds failed".to_string()
2099                        } else {
2100                            format!("quality thresholds failed: {}", failed.join(", "))
2101                        };
2102
2103                        warn!(
2104                            failed_dimensions = ?failed,
2105                            "verify.passed rejected: quality thresholds failed"
2106                        );
2107
2108                        self.diagnostics.log_orchestration(
2109                            self.state.iteration,
2110                            "jsonl",
2111                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2112                                reason,
2113                            },
2114                        );
2115
2116                        validated_events.push(Event::new(
2117                            "verify.failed",
2118                            "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
2119                        ));
2120                    }
2121                } else {
2122                    // No quality report found - synthesize verify.failed
2123                    warn!("verify.passed rejected: missing quality report");
2124
2125                    self.diagnostics.log_orchestration(
2126                        self.state.iteration,
2127                        "jsonl",
2128                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2129                            reason: "missing quality report".to_string(),
2130                        },
2131                    );
2132
2133                    validated_events.push(Event::new(
2134                        "verify.failed",
2135                        "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
2136                    ));
2137                }
2138            } else if event.topic == "verify.failed" {
2139                if EventParser::parse_quality_report(&payload).is_none() {
2140                    warn!("verify.failed missing quality report");
2141                }
2142                validated_events.push(Event::new(event.topic.as_str(), &payload));
2143            } else {
2144                // Non-backpressure events pass through unchanged
2145                validated_events.push(Event::new(event.topic.as_str(), &payload));
2146            }
2147        }
2148
2149        // Track build.blocked events for thrashing detection
2150        let blocked_events: Vec<_> = validated_events
2151            .iter()
2152            .filter(|e| e.topic == "build.blocked".into())
2153            .collect();
2154
2155        for blocked_event in &blocked_events {
2156            let task_id = Self::extract_task_id(&blocked_event.payload);
2157
2158            let count = self
2159                .state
2160                .task_block_counts
2161                .entry(task_id.clone())
2162                .or_insert(0);
2163            *count += 1;
2164
2165            debug!(
2166                task_id = %task_id,
2167                block_count = *count,
2168                "Task blocked"
2169            );
2170
2171            // After 3 blocks on same task, emit build.task.abandoned
2172            if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
2173                warn!(
2174                    task_id = %task_id,
2175                    "Task abandoned after 3 consecutive blocks"
2176                );
2177
2178                self.state.abandoned_tasks.push(task_id.clone());
2179
2180                self.diagnostics.log_orchestration(
2181                    self.state.iteration,
2182                    "jsonl",
2183                    crate::diagnostics::OrchestrationEvent::TaskAbandoned {
2184                        reason: format!(
2185                            "3 consecutive build.blocked events for task '{}'",
2186                            task_id
2187                        ),
2188                    },
2189                );
2190
2191                let abandoned_event = Event::new(
2192                    "build.task.abandoned",
2193                    format!(
2194                        "Task '{}' abandoned after 3 consecutive build.blocked events",
2195                        task_id
2196                    ),
2197                );
2198
2199                self.bus.publish(abandoned_event);
2200            }
2201        }
2202
2203        // Track hat-level blocking for legacy thrashing detection
2204        let has_blocked_event = !blocked_events.is_empty();
2205
2206        if has_blocked_event {
2207            self.state.consecutive_blocked += 1;
2208        } else {
2209            self.state.consecutive_blocked = 0;
2210            self.state.last_blocked_hat = None;
2211        }
2212
2213        // Handle human.interact blocking behavior:
2214        // When a human.interact event is detected and robot service is active,
2215        // send the question and block until human.response or timeout.
2216        let mut response_event = None;
2217        let mut human_interact_context = None;
2218        let ask_human_idx = validated_events
2219            .iter()
2220            .position(|e| e.topic == "human.interact".into());
2221
2222        if let Some(idx) = ask_human_idx {
2223            let ask_event = &validated_events[idx];
2224            let payload = ask_event.payload.clone();
2225
2226            let mut context = match Self::parse_human_interact_context(&payload) {
2227                Value::Object(map) => map,
2228                _ => Map::new(),
2229            };
2230
2231            if let Some(ref robot_service) = self.robot_service {
2232                info!(
2233                    payload = %payload,
2234                    "human.interact event detected — sending question via robot service"
2235                );
2236
2237                // Send the question (includes retry with exponential backoff)
2238                let send_ok = match robot_service.send_question(&payload) {
2239                    Ok(_message_id) => true,
2240                    Err(e) => {
2241                        warn!(
2242                            error = %e,
2243                            "Failed to send human.interact question after retries — treating as timeout"
2244                        );
2245                        // Log to diagnostics
2246                        self.diagnostics.log_error(
2247                            self.state.iteration,
2248                            "telegram",
2249                            crate::diagnostics::DiagnosticError::TelegramSendError {
2250                                operation: "send_question".to_string(),
2251                                error: e.to_string(),
2252                                retry_count: 3,
2253                            },
2254                        );
2255                        context.insert(
2256                            "outcome".to_string(),
2257                            Value::String("send_failure".to_string()),
2258                        );
2259                        context.insert("error".to_string(), Value::String(e.to_string()));
2260                        false
2261                    }
2262                };
2263
2264                // Block: poll events file for human.response
2265                // Per spec, even on send failure we treat as timeout (continue without blocking)
2266                if send_ok {
2267                    // Read the active events path from the current-events marker,
2268                    // falling back to the default events.jsonl if not available.
2269                    let events_path = self
2270                        .loop_context
2271                        .as_ref()
2272                        .and_then(|ctx| {
2273                            std::fs::read_to_string(ctx.current_events_marker())
2274                                .ok()
2275                                .map(|s| ctx.workspace().join(s.trim()))
2276                        })
2277                        .or_else(|| {
2278                            std::fs::read_to_string(".ralph/current-events")
2279                                .ok()
2280                                .map(|s| PathBuf::from(s.trim()))
2281                        })
2282                        .unwrap_or_else(|| {
2283                            self.loop_context
2284                                .as_ref()
2285                                .map(|ctx| ctx.events_path())
2286                                .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2287                        });
2288
2289                    match robot_service.wait_for_response(&events_path) {
2290                        Ok(Some(response)) => {
2291                            info!(
2292                                response = %response,
2293                                "Received human.response — continuing loop"
2294                            );
2295                            context.insert(
2296                                "outcome".to_string(),
2297                                Value::String("response".to_string()),
2298                            );
2299                            context.insert("response".to_string(), Value::String(response.clone()));
2300                            // Create a human.response event to inject into the bus
2301                            response_event = Some(Event::new("human.response", &response));
2302                        }
2303                        Ok(None) => {
2304                            warn!(
2305                                timeout_secs = robot_service.timeout_secs(),
2306                                "Human response timeout — injecting human.timeout event"
2307                            );
2308                            context.insert(
2309                                "outcome".to_string(),
2310                                Value::String("timeout".to_string()),
2311                            );
2312                            context.insert(
2313                                "timeout_seconds".to_string(),
2314                                Value::from(robot_service.timeout_secs()),
2315                            );
2316                            let timeout_event = Event::new(
2317                                "human.timeout",
2318                                format!(
2319                                    "No response after {}s. Original question: {}",
2320                                    robot_service.timeout_secs(),
2321                                    payload
2322                                ),
2323                            );
2324                            response_event = Some(timeout_event);
2325                        }
2326                        Err(e) => {
2327                            warn!(
2328                                error = %e,
2329                                "Error waiting for human response — injecting human.timeout event"
2330                            );
2331                            context.insert(
2332                                "outcome".to_string(),
2333                                Value::String("wait_error".to_string()),
2334                            );
2335                            context.insert("error".to_string(), Value::String(e.to_string()));
2336                            let timeout_event = Event::new(
2337                                "human.timeout",
2338                                format!(
2339                                    "Error waiting for response: {}. Original question: {}",
2340                                    e, payload
2341                                ),
2342                            );
2343                            response_event = Some(timeout_event);
2344                        }
2345                    }
2346                }
2347            } else {
2348                debug!(
2349                    "human.interact event detected but no robot service active — passing through"
2350                );
2351                context.insert(
2352                    "outcome".to_string(),
2353                    Value::String("no_robot_service".to_string()),
2354                );
2355            }
2356
2357            human_interact_context = Some(Value::Object(context));
2358        }
2359
2360        // Track whether any events will be published (before the loop consumes them).
2361        let had_events = !validated_events.is_empty();
2362        let had_plan_events = validated_events
2363            .iter()
2364            .any(|event| event.topic.as_str().starts_with("plan."));
2365
2366        // Publish validated events to the bus.
2367        // Ralph is always registered with subscribe("*"), so every event has at least
2368        // one subscriber. Events without a specific hat subscriber are "orphaned" —
2369        // Ralph handles them as the universal fallback.
2370        for event in validated_events {
2371            // Record topic for event chain validation
2372            self.state.record_topic(event.topic.as_str());
2373
2374            self.diagnostics.log_orchestration(
2375                self.state.iteration,
2376                "jsonl",
2377                crate::diagnostics::OrchestrationEvent::EventPublished {
2378                    topic: event.topic.to_string(),
2379                },
2380            );
2381
2382            if !self.registry.has_subscriber(event.topic.as_str()) {
2383                has_orphans = true;
2384            }
2385
2386            debug!(
2387                topic = %event.topic,
2388                "Publishing event from JSONL"
2389            );
2390            self.bus.publish(event);
2391        }
2392
2393        // Publish human.response event if one was received during blocking
2394        if let Some(response) = response_event {
2395            self.state.record_topic(response.topic.as_str());
2396            info!(
2397                topic = %response.topic,
2398                "Publishing human.response event from robot service"
2399            );
2400            self.bus.publish(response);
2401        }
2402
2403        Ok(ProcessedEvents {
2404            had_events,
2405            had_plan_events,
2406            human_interact_context,
2407            has_orphans,
2408        })
2409    }
2410
2411    /// Checks if output contains a completion event from Ralph.
2412    ///
2413    /// Completion must be emitted as an `<event>` tag, not plain text.
2414    pub fn check_ralph_completion(&self, output: &str) -> bool {
2415        let events = EventParser::new().parse(output);
2416        events
2417            .iter()
2418            .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2419    }
2420
2421    /// Publishes the loop.terminate system event to observers.
2422    ///
2423    /// Per spec: "Published by the orchestrator (not agents) when the loop exits."
2424    /// This is an observer-only event—hats cannot trigger on it.
2425    ///
2426    /// Returns the event for logging purposes.
2427    pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2428        // Stop the robot service if it was running
2429        self.stop_robot_service();
2430
2431        let elapsed = self.state.elapsed();
2432        let duration_str = format_duration(elapsed);
2433
2434        let payload = format!(
2435            "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2436            reason.as_str(),
2437            termination_status_text(reason),
2438            self.state.iteration,
2439            duration_str,
2440            reason.exit_code()
2441        );
2442
2443        let event = Event::new("loop.terminate", &payload);
2444
2445        // Publish to bus for observers (but no hat can trigger on this)
2446        self.bus.publish(event.clone());
2447
2448        info!(
2449            reason = %reason.as_str(),
2450            iterations = self.state.iteration,
2451            duration = %duration_str,
2452            "Wrapping up: {}. {} iterations in {}.",
2453            reason.as_str(),
2454            self.state.iteration,
2455            duration_str
2456        );
2457
2458        event
2459    }
2460
2461    /// Returns the robot service's shutdown flag, if active.
2462    ///
2463    /// Signal handlers can set this flag to interrupt `wait_for_response()`
2464    /// without waiting for the full timeout.
2465    pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2466        self.robot_service.as_ref().map(|s| s.shutdown_flag())
2467    }
2468
2469    /// Stops the robot service if it's running.
2470    ///
2471    /// Called during loop termination to cleanly shut down the communication backend.
2472    fn stop_robot_service(&mut self) {
2473        if let Some(service) = self.robot_service.take() {
2474            service.stop();
2475        }
2476    }
2477
2478    // -------------------------------------------------------------------------
2479    // Human-in-the-loop planning support
2480    // -------------------------------------------------------------------------
2481
2482    /// Check if any event is a `user.prompt` event.
2483    ///
2484    /// Returns the first user prompt event found, or None.
2485    pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2486        events
2487            .iter()
2488            .find(|e| e.topic.as_str() == "user.prompt")
2489            .map(|e| UserPrompt {
2490                id: Self::extract_prompt_id(&e.payload),
2491                text: e.payload.clone(),
2492            })
2493    }
2494
2495    /// Extract a prompt ID from the event payload.
2496    ///
2497    /// Supports both XML attribute format: `<event topic="user.prompt" id="q1">...</event>`
2498    /// and JSON format in payload.
2499    fn extract_prompt_id(payload: &str) -> String {
2500        // Try to extract id attribute from XML-like format first
2501        if let Some(start) = payload.find("id=\"")
2502            && let Some(end) = payload[start + 4..].find('"')
2503        {
2504            return payload[start + 4..start + 4 + end].to_string();
2505        }
2506
2507        // Fallback: generate a simple ID based on timestamp
2508        format!("q{}", Self::generate_prompt_id())
2509    }
2510
2511    /// Generate a simple unique ID for prompts.
2512    /// Uses timestamp-based generation since uuid crate isn't available.
2513    fn generate_prompt_id() -> String {
2514        use std::time::{SystemTime, UNIX_EPOCH};
2515        let nanos = SystemTime::now()
2516            .duration_since(UNIX_EPOCH)
2517            .unwrap()
2518            .as_nanos();
2519        format!("{:x}", nanos % 0xFFFF_FFFF)
2520    }
2521}
2522
2523/// A user prompt that requires human input.
2524///
2525/// Created when the agent emits a `user.prompt` event during planning.
2526#[derive(Debug, Clone)]
2527pub struct UserPrompt {
2528    /// Unique identifier for this prompt (e.g., "q1", "q2")
2529    pub id: String,
2530    /// The prompt/question text
2531    pub text: String,
2532}
2533
2534/// Formats a duration as human-readable string.
2535fn format_duration(d: Duration) -> String {
2536    let total_secs = d.as_secs();
2537    let hours = total_secs / 3600;
2538    let minutes = (total_secs % 3600) / 60;
2539    let seconds = total_secs % 60;
2540
2541    if hours > 0 {
2542        format!("{}h {}m {}s", hours, minutes, seconds)
2543    } else if minutes > 0 {
2544        format!("{}m {}s", minutes, seconds)
2545    } else {
2546        format!("{}s", seconds)
2547    }
2548}
2549
2550/// Returns a human-readable status based on termination reason.
2551fn termination_status_text(reason: &TerminationReason) -> &'static str {
2552    match reason {
2553        TerminationReason::CompletionPromise => "All tasks completed successfully.",
2554        TerminationReason::MaxIterations => "Stopped at iteration limit.",
2555        TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2556        TerminationReason::MaxCost => "Stopped at cost limit.",
2557        TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2558        TerminationReason::LoopThrashing => {
2559            "Loop thrashing detected - same hat repeatedly blocked."
2560        }
2561        TerminationReason::LoopStale => {
2562            "Stale loop detected - same topic emitted 3+ times consecutively."
2563        }
2564        TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2565        TerminationReason::Stopped => "Manually stopped.",
2566        TerminationReason::Interrupted => "Interrupted by signal.",
2567        TerminationReason::RestartRequested => "Restarting by human request.",
2568        TerminationReason::WorkspaceGone => "Workspace directory removed externally.",
2569        TerminationReason::Cancelled => "Cancelled gracefully (human rejection or timeout).",
2570    }
2571}