Skip to main content

ralph_core/event_loop/
mod.rs

1//! Event loop orchestration.
2//!
3//! The event loop coordinates the execution of hats via pub/sub messaging.
4
5mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::sync::atomic::AtomicBool;
24use std::time::Duration;
25use tracing::{debug, info, warn};
26
27/// Reason the event loop terminated.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum TerminationReason {
30    /// Completion promise was detected in output.
31    CompletionPromise,
32    /// Maximum iterations reached.
33    MaxIterations,
34    /// Maximum runtime exceeded.
35    MaxRuntime,
36    /// Maximum cost exceeded.
37    MaxCost,
38    /// Too many consecutive failures.
39    ConsecutiveFailures,
40    /// Loop thrashing detected (repeated blocked events).
41    LoopThrashing,
42    /// Too many consecutive malformed JSONL lines in events file.
43    ValidationFailure,
44    /// Manually stopped.
45    Stopped,
46    /// Interrupted by signal (SIGINT/SIGTERM).
47    Interrupted,
48    /// Restart requested via Telegram `/restart` command.
49    RestartRequested,
50}
51
52impl TerminationReason {
53    /// Returns the exit code for this termination reason per spec.
54    ///
55    /// Per spec "Loop Termination" section:
56    /// - 0: Completion promise detected (success)
57    /// - 1: Consecutive failures or unrecoverable error (failure)
58    /// - 2: Max iterations, max runtime, or max cost exceeded (limit)
59    /// - 130: User interrupt (SIGINT = 128 + 2)
60    pub fn exit_code(&self) -> i32 {
61        match self {
62            TerminationReason::CompletionPromise => 0,
63            TerminationReason::ConsecutiveFailures
64            | TerminationReason::LoopThrashing
65            | TerminationReason::ValidationFailure
66            | TerminationReason::Stopped => 1,
67            TerminationReason::MaxIterations
68            | TerminationReason::MaxRuntime
69            | TerminationReason::MaxCost => 2,
70            TerminationReason::Interrupted => 130,
71            // Restart uses exit code 3 to signal the caller to exec-replace
72            TerminationReason::RestartRequested => 3,
73        }
74    }
75
76    /// Returns the reason string for use in loop.terminate event payload.
77    ///
78    /// Per spec event payload format:
79    /// `completed | max_iterations | max_runtime | consecutive_failures | interrupted | error`
80    pub fn as_str(&self) -> &'static str {
81        match self {
82            TerminationReason::CompletionPromise => "completed",
83            TerminationReason::MaxIterations => "max_iterations",
84            TerminationReason::MaxRuntime => "max_runtime",
85            TerminationReason::MaxCost => "max_cost",
86            TerminationReason::ConsecutiveFailures => "consecutive_failures",
87            TerminationReason::LoopThrashing => "loop_thrashing",
88            TerminationReason::ValidationFailure => "validation_failure",
89            TerminationReason::Stopped => "stopped",
90            TerminationReason::Interrupted => "interrupted",
91            TerminationReason::RestartRequested => "restart_requested",
92        }
93    }
94
95    /// Returns true if this is a successful completion (not an error or limit).
96    pub fn is_success(&self) -> bool {
97        matches!(self, TerminationReason::CompletionPromise)
98    }
99}
100
101/// The main event loop orchestrator.
102pub struct EventLoop {
103    config: RalphConfig,
104    registry: HatRegistry,
105    bus: EventBus,
106    state: LoopState,
107    instruction_builder: InstructionBuilder,
108    ralph: HatlessRalph,
109    /// Cached human guidance messages that should persist across iterations.
110    robot_guidance: Vec<String>,
111    /// Event reader for consuming events from JSONL file.
112    /// Made pub(crate) to allow tests to override the path.
113    pub(crate) event_reader: EventReader,
114    diagnostics: crate::diagnostics::DiagnosticsCollector,
115    /// Loop context for path resolution (None for legacy single-loop mode).
116    loop_context: Option<LoopContext>,
117    /// Skill registry for the current loop.
118    skill_registry: SkillRegistry,
119    /// Robot service for human-in-the-loop communication.
120    /// Injected externally when `human.enabled` is true and this is the primary loop.
121    robot_service: Option<Box<dyn RobotService>>,
122}
123
124impl EventLoop {
125    /// Creates a new event loop from configuration.
126    pub fn new(config: RalphConfig) -> Self {
127        // Try to create diagnostics collector, but fall back to disabled if it fails
128        // (e.g., in tests without proper directory setup)
129        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
130            .unwrap_or_else(|e| {
131                debug!(
132                    "Failed to initialize diagnostics: {}, using disabled collector",
133                    e
134                );
135                crate::diagnostics::DiagnosticsCollector::disabled()
136            });
137
138        Self::with_diagnostics(config, diagnostics)
139    }
140
141    /// Creates a new event loop with a loop context for path resolution.
142    ///
143    /// The loop context determines where events, tasks, and other state files
144    /// are located. Use this for multi-loop scenarios where each loop runs
145    /// in an isolated workspace (git worktree).
146    pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
147        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
148            .unwrap_or_else(|e| {
149                debug!(
150                    "Failed to initialize diagnostics: {}, using disabled collector",
151                    e
152                );
153                crate::diagnostics::DiagnosticsCollector::disabled()
154            });
155
156        Self::with_context_and_diagnostics(config, context, diagnostics)
157    }
158
159    /// Creates a new event loop with explicit loop context and diagnostics.
160    pub fn with_context_and_diagnostics(
161        config: RalphConfig,
162        context: LoopContext,
163        diagnostics: crate::diagnostics::DiagnosticsCollector,
164    ) -> Self {
165        let registry = HatRegistry::from_config(&config);
166        let instruction_builder =
167            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
168
169        let mut bus = EventBus::new();
170
171        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
172        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
173        // Custom hats are registered first (higher priority), Ralph catches everything else.
174        for hat in registry.all() {
175            bus.register(hat.clone());
176        }
177
178        // Always register Ralph as catch-all coordinator
179        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
180        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
181        bus.register(ralph_hat);
182
183        if registry.is_empty() {
184            debug!("Solo mode: Ralph is the only coordinator");
185        } else {
186            debug!(
187                "Multi-hat mode: {} custom hats + Ralph as fallback",
188                registry.len()
189            );
190        }
191
192        // Build skill registry from config
193        let skill_registry = if config.skills.enabled {
194            SkillRegistry::from_config(
195                &config.skills,
196                context.workspace(),
197                Some(config.cli.backend.as_str()),
198            )
199            .unwrap_or_else(|e| {
200                warn!(
201                    "Failed to build skill registry: {}, using empty registry",
202                    e
203                );
204                SkillRegistry::new(Some(config.cli.backend.as_str()))
205            })
206        } else {
207            SkillRegistry::new(Some(config.cli.backend.as_str()))
208        };
209
210        let skill_index = if config.skills.enabled {
211            skill_registry.build_index(None)
212        } else {
213            String::new()
214        };
215
216        // When memories are enabled, add tasks CLI instructions alongside scratchpad
217        let ralph = HatlessRalph::new(
218            config.event_loop.completion_promise.clone(),
219            config.core.clone(),
220            &registry,
221            config.event_loop.starting_event.clone(),
222        )
223        .with_memories_enabled(config.memories.enabled)
224        .with_skill_index(skill_index);
225
226        // Read timestamped events path from marker file, fall back to default
227        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
228        // which we resolve relative to the workspace root
229        let events_path = std::fs::read_to_string(context.current_events_marker())
230            .map(|s| {
231                let relative = s.trim();
232                context.workspace().join(relative)
233            })
234            .unwrap_or_else(|_| context.events_path());
235        let event_reader = EventReader::new(&events_path);
236
237        Self {
238            config,
239            registry,
240            bus,
241            state: LoopState::new(),
242            instruction_builder,
243            ralph,
244            robot_guidance: Vec::new(),
245            event_reader,
246            diagnostics,
247            loop_context: Some(context),
248            skill_registry,
249            robot_service: None,
250        }
251    }
252
253    /// Creates a new event loop with explicit diagnostics collector (for testing).
254    pub fn with_diagnostics(
255        config: RalphConfig,
256        diagnostics: crate::diagnostics::DiagnosticsCollector,
257    ) -> Self {
258        let registry = HatRegistry::from_config(&config);
259        let instruction_builder =
260            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
261
262        let mut bus = EventBus::new();
263
264        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
265        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
266        // Custom hats are registered first (higher priority), Ralph catches everything else.
267        for hat in registry.all() {
268            bus.register(hat.clone());
269        }
270
271        // Always register Ralph as catch-all coordinator
272        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
273        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
274        bus.register(ralph_hat);
275
276        if registry.is_empty() {
277            debug!("Solo mode: Ralph is the only coordinator");
278        } else {
279            debug!(
280                "Multi-hat mode: {} custom hats + Ralph as fallback",
281                registry.len()
282            );
283        }
284
285        // Build skill registry from config
286        let workspace_root = std::path::Path::new(".");
287        let skill_registry = if config.skills.enabled {
288            SkillRegistry::from_config(
289                &config.skills,
290                workspace_root,
291                Some(config.cli.backend.as_str()),
292            )
293            .unwrap_or_else(|e| {
294                warn!(
295                    "Failed to build skill registry: {}, using empty registry",
296                    e
297                );
298                SkillRegistry::new(Some(config.cli.backend.as_str()))
299            })
300        } else {
301            SkillRegistry::new(Some(config.cli.backend.as_str()))
302        };
303
304        let skill_index = if config.skills.enabled {
305            skill_registry.build_index(None)
306        } else {
307            String::new()
308        };
309
310        // When memories are enabled, add tasks CLI instructions alongside scratchpad
311        let ralph = HatlessRalph::new(
312            config.event_loop.completion_promise.clone(),
313            config.core.clone(),
314            &registry,
315            config.event_loop.starting_event.clone(),
316        )
317        .with_memories_enabled(config.memories.enabled)
318        .with_skill_index(skill_index);
319
320        // Read events path from marker file, fall back to default if not present
321        // The marker file is written by run_loop_impl() at run startup
322        let events_path = std::fs::read_to_string(".ralph/current-events")
323            .map(|s| s.trim().to_string())
324            .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
325        let event_reader = EventReader::new(&events_path);
326
327        Self {
328            config,
329            registry,
330            bus,
331            state: LoopState::new(),
332            instruction_builder,
333            ralph,
334            robot_guidance: Vec::new(),
335            event_reader,
336            diagnostics,
337            loop_context: None,
338            skill_registry,
339            robot_service: None,
340        }
341    }
342
343    /// Injects a robot service for human-in-the-loop communication.
344    ///
345    /// Call this after construction to enable `human.interact` event handling,
346    /// periodic check-ins, and question/response flow. The service is typically
347    /// created by the CLI layer (e.g., `TelegramService`) and injected here,
348    /// keeping the core event loop decoupled from any specific communication
349    /// platform.
350    pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
351        self.robot_service = Some(service);
352    }
353
354    /// Returns the loop context, if one was provided.
355    pub fn loop_context(&self) -> Option<&LoopContext> {
356        self.loop_context.as_ref()
357    }
358
359    /// Returns the tasks path based on loop context or default.
360    fn tasks_path(&self) -> PathBuf {
361        self.loop_context
362            .as_ref()
363            .map(|ctx| ctx.tasks_path())
364            .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
365    }
366
367    /// Returns the scratchpad path based on loop context or config.
368    fn scratchpad_path(&self) -> PathBuf {
369        self.loop_context
370            .as_ref()
371            .map(|ctx| ctx.scratchpad_path())
372            .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
373    }
374
375    /// Returns the current loop state.
376    pub fn state(&self) -> &LoopState {
377        &self.state
378    }
379
380    /// Returns the configuration.
381    pub fn config(&self) -> &RalphConfig {
382        &self.config
383    }
384
385    /// Returns the hat registry.
386    pub fn registry(&self) -> &HatRegistry {
387        &self.registry
388    }
389
390    /// Gets the backend configuration for a hat.
391    ///
392    /// If the hat has a backend configured, returns that.
393    /// Otherwise, returns None (caller should use global backend).
394    pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
395        self.registry
396            .get_config(hat_id)
397            .and_then(|config| config.backend.as_ref())
398    }
399
400    /// Adds an observer that receives all published events.
401    ///
402    /// Multiple observers can be added (e.g., session recorder + TUI).
403    /// Each observer is called before events are routed to subscribers.
404    pub fn add_observer<F>(&mut self, observer: F)
405    where
406        F: Fn(&Event) + Send + 'static,
407    {
408        self.bus.add_observer(observer);
409    }
410
411    /// Sets a single observer, clearing any existing observers.
412    ///
413    /// Prefer `add_observer` when multiple observers are needed.
414    #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
415    pub fn set_observer<F>(&mut self, observer: F)
416    where
417        F: Fn(&Event) + Send + 'static,
418    {
419        #[allow(deprecated)]
420        self.bus.set_observer(observer);
421    }
422
423    /// Checks if any termination condition is met.
424    pub fn check_termination(&self) -> Option<TerminationReason> {
425        let cfg = &self.config.event_loop;
426
427        if self.state.iteration >= cfg.max_iterations {
428            return Some(TerminationReason::MaxIterations);
429        }
430
431        if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
432            return Some(TerminationReason::MaxRuntime);
433        }
434
435        if let Some(max_cost) = cfg.max_cost_usd
436            && self.state.cumulative_cost >= max_cost
437        {
438            return Some(TerminationReason::MaxCost);
439        }
440
441        if self.state.consecutive_failures >= cfg.max_consecutive_failures {
442            return Some(TerminationReason::ConsecutiveFailures);
443        }
444
445        // Check for loop thrashing: planner keeps dispatching abandoned tasks
446        if self.state.abandoned_task_redispatches >= 3 {
447            return Some(TerminationReason::LoopThrashing);
448        }
449
450        // Check for validation failures: too many consecutive malformed JSONL lines
451        if self.state.consecutive_malformed_events >= 3 {
452            return Some(TerminationReason::ValidationFailure);
453        }
454
455        // Check for stop signal from Telegram /stop or CLI stop-requested
456        let stop_path =
457            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
458        if stop_path.exists() {
459            let _ = std::fs::remove_file(&stop_path);
460            return Some(TerminationReason::Stopped);
461        }
462
463        // Check for restart signal from Telegram /restart command
464        let restart_path =
465            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
466        if restart_path.exists() {
467            return Some(TerminationReason::RestartRequested);
468        }
469
470        None
471    }
472
473    /// Checks if a completion event was received and returns termination reason.
474    ///
475    /// Completion is only accepted via JSONL events (e.g., `ralph emit`).
476    pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
477        if !self.state.completion_requested {
478            return None;
479        }
480
481        self.state.completion_requested = false;
482
483        // In persistent mode, suppress completion and keep the loop alive
484        if self.config.event_loop.persistent {
485            info!("Completion event suppressed - persistent mode active, loop staying alive");
486
487            self.diagnostics.log_orchestration(
488                self.state.iteration,
489                "loop",
490                crate::diagnostics::OrchestrationEvent::LoopTerminated {
491                    reason: "completion_event_suppressed_persistent".to_string(),
492                },
493            );
494
495            // Inject a task.resume event so the loop continues with an idle prompt
496            let resume_event = Event::new(
497                "task.resume",
498                "Persistent mode: loop staying alive after completion signal. \
499                 Check for new tasks or await human guidance.",
500            );
501            self.bus.publish(resume_event);
502
503            return None;
504        }
505
506        // Log warning if tasks remain open (informational only)
507        if self.config.memories.enabled {
508            if let Ok(false) = self.verify_tasks_complete() {
509                let open_tasks = self.get_open_task_list();
510                warn!(
511                    open_tasks = ?open_tasks,
512                    "Completion event with {} open task(s) - trusting agent decision",
513                    open_tasks.len()
514                );
515            }
516        } else if let Ok(false) = self.verify_scratchpad_complete() {
517            warn!("Completion event with pending scratchpad tasks - trusting agent decision");
518        }
519
520        info!("Completion event detected - terminating");
521
522        // Log loop terminated
523        self.diagnostics.log_orchestration(
524            self.state.iteration,
525            "loop",
526            crate::diagnostics::OrchestrationEvent::LoopTerminated {
527                reason: "completion_event".to_string(),
528            },
529        );
530
531        Some(TerminationReason::CompletionPromise)
532    }
533
534    /// Initializes the loop by publishing the start event.
535    pub fn initialize(&mut self, prompt_content: &str) {
536        // Use configured starting_event or default to task.start for backward compatibility
537        let topic = self
538            .config
539            .event_loop
540            .starting_event
541            .clone()
542            .unwrap_or_else(|| "task.start".to_string());
543        self.initialize_with_topic(&topic, prompt_content);
544    }
545
546    /// Initializes the loop for resume mode by publishing task.resume.
547    ///
548    /// Per spec: "User can run `ralph resume` to restart reading existing scratchpad."
549    /// The planner should read the existing scratchpad rather than doing fresh gap analysis.
550    pub fn initialize_resume(&mut self, prompt_content: &str) {
551        // Resume always uses task.resume regardless of starting_event config
552        self.initialize_with_topic("task.resume", prompt_content);
553    }
554
555    /// Common initialization logic with configurable topic.
556    fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
557        // Store the objective so it persists across all iterations.
558        // After iteration 1, bus.take_pending() consumes the start event,
559        // so without this the objective would be invisible to later hats.
560        self.ralph.set_objective(prompt_content.to_string());
561
562        let start_event = Event::new(topic, prompt_content);
563        self.bus.publish(start_event);
564        debug!(topic = topic, "Published {} event", topic);
565    }
566
567    /// Gets the next hat to execute (if any have pending events).
568    ///
569    /// Per "Hatless Ralph" architecture: When custom hats are defined, Ralph is
570    /// always the executor. Custom hats define topology (pub/sub contracts) that
571    /// Ralph uses for coordination context, but Ralph handles all iterations.
572    ///
573    /// - Solo mode (no custom hats): Returns "ralph" if Ralph has pending events
574    /// - Multi-hat mode (custom hats defined): Always returns "ralph" if ANY hat has pending events
575    pub fn next_hat(&self) -> Option<&HatId> {
576        let next = self.bus.next_hat_with_pending();
577
578        // If no pending hat events but human interactions are pending, route to Ralph.
579        if next.is_none() && self.bus.has_human_pending() {
580            return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
581        }
582
583        // If no pending events, return None
584        next.as_ref()?;
585
586        // In multi-hat mode, always route to Ralph (custom hats define topology only)
587        // Ralph's prompt includes the ## HATS section for coordination awareness
588        if self.registry.is_empty() {
589            // Solo mode - return the next hat (which is "ralph")
590            next
591        } else {
592            // Return "ralph" - the constant coordinator
593            // Find ralph in the bus's registered hats
594            self.bus.hat_ids().find(|id| id.as_str() == "ralph")
595        }
596    }
597
598    /// Checks if any hats have pending events.
599    ///
600    /// Use this after `process_output` to detect if the LLM failed to publish an event.
601    /// If false after processing, the loop will terminate on the next iteration.
602    pub fn has_pending_events(&self) -> bool {
603        self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
604    }
605
606    /// Checks if any pending events are human-related (human.response, human.guidance).
607    ///
608    /// Used to skip cooldown delays when a human event is next, since we don't
609    /// want to artificially delay the response to a human interaction.
610    pub fn has_pending_human_events(&self) -> bool {
611        self.bus.has_human_pending()
612    }
613
614    /// Gets the topics a hat is allowed to publish.
615    ///
616    /// Used to build retry prompts when the LLM forgets to publish an event.
617    pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
618        self.registry
619            .get(hat_id)
620            .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
621            .unwrap_or_default()
622    }
623
624    /// Injects a fallback event to recover from a stalled loop.
625    ///
626    /// When no hats have pending events (agent failed to publish), this method
627    /// injects a `task.resume` event which Ralph will handle to attempt recovery.
628    ///
629    /// Returns true if a fallback event was injected, false if recovery is not possible.
630    pub fn inject_fallback_event(&mut self) -> bool {
631        let fallback_event = Event::new(
632            "task.resume",
633            "RECOVERY: Previous iteration did not publish an event. \
634             Review the scratchpad and either dispatch the next task or complete the loop.",
635        );
636
637        // If a custom hat was last executing, target the fallback back to it
638        // This preserves hat context instead of always falling back to Ralph
639        let fallback_event = match &self.state.last_hat {
640            Some(hat_id) if hat_id.as_str() != "ralph" => {
641                debug!(
642                    hat = %hat_id.as_str(),
643                    "Injecting fallback event to recover - targeting last hat with task.resume"
644                );
645                fallback_event.with_target(hat_id.clone())
646            }
647            _ => {
648                debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
649                fallback_event
650            }
651        };
652
653        self.bus.publish(fallback_event);
654        true
655    }
656
657    /// Builds the prompt for a hat's execution.
658    ///
659    /// Per "Hatless Ralph" architecture:
660    /// - Solo mode: Ralph handles everything with his own prompt
661    /// - Multi-hat mode: Ralph is the sole executor, custom hats define topology only
662    ///
663    /// When in multi-hat mode, this method collects ALL pending events across all hats
664    /// and builds Ralph's prompt with that context. The `## HATS` section in Ralph's
665    /// prompt documents the topology for coordination awareness.
666    ///
667    /// If memories are configured with `inject: auto`, this method also prepends
668    /// primed memories to the prompt context. If a scratchpad file exists and is
669    /// non-empty, its content is also prepended (before memories).
670    pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
671        // Handle "ralph" hat - the constant coordinator
672        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
673        if hat_id.as_str() == "ralph" {
674            if self.registry.is_empty() {
675                // Solo mode - just Ralph's events, no hats to filter
676                let mut events = self.bus.take_pending(&hat_id.clone());
677                let mut human_events = self.bus.take_human_pending();
678                events.append(&mut human_events);
679
680                // Separate human.guidance events from regular events
681                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
682                    .into_iter()
683                    .partition(|e| e.topic.as_str() == "human.guidance");
684
685                let events_context = regular_events
686                    .iter()
687                    .map(|e| Self::format_event(e))
688                    .collect::<Vec<_>>()
689                    .join("\n");
690
691                // Persist and inject human guidance into prompt if present
692                self.update_robot_guidance(guidance_events);
693                self.apply_robot_guidance();
694
695                // Build base prompt and prepend memories + scratchpad + ready tasks
696                let base_prompt = self.ralph.build_prompt(&events_context, &[]);
697                self.ralph.clear_robot_guidance();
698                let with_skills = self.prepend_auto_inject_skills(base_prompt);
699                let with_scratchpad = self.prepend_scratchpad(with_skills);
700                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
701
702                debug!("build_prompt: routing to HatlessRalph (solo mode)");
703                return Some(final_prompt);
704            } else {
705                // Multi-hat mode: collect events and determine active hats
706                let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
707                // Deterministic ordering (avoid HashMap iteration order nondeterminism).
708                all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
709
710                let mut all_events = Vec::new();
711                let mut system_events = Vec::new();
712
713                for id in &all_hat_ids {
714                    let pending = self.bus.take_pending(id);
715                    if pending.is_empty() {
716                        continue;
717                    }
718
719                    let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
720                    if drop_pending {
721                        // Drop the pending events that would have activated the hat.
722                        if let Some(exhausted_event) = exhausted_event {
723                            all_events.push(exhausted_event.clone());
724                            system_events.push(exhausted_event);
725                        }
726                        continue;
727                    }
728
729                    all_events.extend(pending);
730                }
731
732                let mut human_events = self.bus.take_human_pending();
733                all_events.append(&mut human_events);
734
735                // Publish orchestrator-generated system events after consuming pending events,
736                // so they become visible in the event log and can be handled next iteration.
737                for event in system_events {
738                    self.bus.publish(event);
739                }
740
741                // Separate human.guidance events from regular events
742                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
743                    .into_iter()
744                    .partition(|e| e.topic.as_str() == "human.guidance");
745
746                // Persist and inject human guidance before building prompt (must happen before
747                // immutable borrows from determine_active_hats)
748                self.update_robot_guidance(guidance_events);
749                self.apply_robot_guidance();
750
751                // Determine which hats are active based on regular events
752                let active_hat_ids = self.determine_active_hat_ids(&regular_events);
753                self.record_hat_activations(&active_hat_ids);
754                let active_hats = self.determine_active_hats(&regular_events);
755
756                // Format events for context
757                let events_context = regular_events
758                    .iter()
759                    .map(|e| Self::format_event(e))
760                    .collect::<Vec<_>>()
761                    .join("\n");
762
763                // Build base prompt and prepend memories + scratchpad if available
764                let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
765
766                // Build prompt with active hats - filters instructions to only active hats
767                debug!(
768                    "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
769                    active_hats
770                        .iter()
771                        .map(|h| h.id.as_str())
772                        .collect::<Vec<_>>()
773                );
774
775                // Clear guidance after active_hats references are no longer needed
776                self.ralph.clear_robot_guidance();
777                let with_skills = self.prepend_auto_inject_skills(base_prompt);
778                let with_scratchpad = self.prepend_scratchpad(with_skills);
779                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
780
781                return Some(final_prompt);
782            }
783        }
784
785        // Non-ralph hat requested - this shouldn't happen in multi-hat mode since
786        // next_hat() always returns "ralph" when custom hats are defined.
787        // But we keep this code path for backward compatibility and tests.
788        let events = self.bus.take_pending(&hat_id.clone());
789        let events_context = events
790            .iter()
791            .map(|e| Self::format_event(e))
792            .collect::<Vec<_>>()
793            .join("\n");
794
795        let hat = self.registry.get(hat_id)?;
796
797        // Debug logging to trace hat routing
798        debug!(
799            "build_prompt: hat_id='{}', instructions.is_empty()={}",
800            hat_id.as_str(),
801            hat.instructions.is_empty()
802        );
803
804        // All hats use build_custom_hat with ghuntley-style prompts
805        debug!(
806            "build_prompt: routing to build_custom_hat() for '{}'",
807            hat_id.as_str()
808        );
809        Some(
810            self.instruction_builder
811                .build_custom_hat(hat, &events_context),
812        )
813    }
814
815    /// Stores guidance payloads, persists them to scratchpad, and prepares them for prompt injection.
816    ///
817    /// Guidance events are ephemeral in the event bus (consumed by `take_pending`).
818    /// This method both caches them in memory for prompt injection and appends
819    /// them to the scratchpad file so they survive across process restarts.
820    fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
821        if guidance_events.is_empty() {
822            return;
823        }
824
825        // Persist new guidance to scratchpad before caching
826        self.persist_guidance_to_scratchpad(&guidance_events);
827
828        self.robot_guidance
829            .extend(guidance_events.into_iter().map(|e| e.payload));
830    }
831
832    /// Appends human guidance entries to the scratchpad file for durability.
833    ///
834    /// Each guidance message is written as a timestamped markdown entry so it
835    /// appears alongside the agent's own thinking and survives process restarts.
836    fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
837        use std::io::Write;
838
839        let scratchpad_path = self.scratchpad_path();
840        let resolved_path = if scratchpad_path.is_relative() {
841            self.config.core.workspace_root.join(&scratchpad_path)
842        } else {
843            scratchpad_path
844        };
845
846        // Create parent directories if needed
847        if let Some(parent) = resolved_path.parent()
848            && !parent.exists()
849            && let Err(e) = std::fs::create_dir_all(parent)
850        {
851            warn!("Failed to create scratchpad directory: {}", e);
852            return;
853        }
854
855        let mut file = match std::fs::OpenOptions::new()
856            .create(true)
857            .append(true)
858            .open(&resolved_path)
859        {
860            Ok(f) => f,
861            Err(e) => {
862                warn!("Failed to open scratchpad for guidance persistence: {}", e);
863                return;
864            }
865        };
866
867        let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
868        for event in guidance_events {
869            let entry = format!(
870                "\n### HUMAN GUIDANCE ({})\n\n{}\n",
871                timestamp, event.payload
872            );
873            if let Err(e) = file.write_all(entry.as_bytes()) {
874                warn!("Failed to write guidance to scratchpad: {}", e);
875            }
876        }
877
878        info!(
879            count = guidance_events.len(),
880            "Persisted human guidance to scratchpad"
881        );
882    }
883
884    /// Injects cached guidance into the next prompt build.
885    fn apply_robot_guidance(&mut self) {
886        if self.robot_guidance.is_empty() {
887            return;
888        }
889
890        self.ralph.set_robot_guidance(self.robot_guidance.clone());
891    }
892
893    /// Prepends auto-injected skill content to the prompt.
894    ///
895    /// This generalizes the former `prepend_memories()` into a skill auto-injection
896    /// pipeline that handles memories, tools, and any other auto-inject skills.
897    ///
898    /// Injection order:
899    /// 1. Memory data + ralph-tools skill (special case: loads memory data from store, applies budget)
900    /// 2. RObot interaction skill (gated by `robot.enabled`)
901    /// 3. Other auto-inject skills from the registry (wrapped in XML tags)
902    fn prepend_auto_inject_skills(&self, prompt: String) -> String {
903        let mut prefix = String::new();
904
905        // 1. Memory data + ralph-tools skill — special case with data loading
906        self.inject_memories_and_tools_skill(&mut prefix);
907
908        // 2. RObot interaction skill — gated by robot.enabled
909        self.inject_robot_skill(&mut prefix);
910
911        // 3. Other auto-inject skills from the registry
912        self.inject_custom_auto_skills(&mut prefix);
913
914        if prefix.is_empty() {
915            return prompt;
916        }
917
918        prefix.push_str("\n\n");
919        prefix.push_str(&prompt);
920        prefix
921    }
922
923    /// Injects memory data and the ralph-tools skill into the prefix.
924    ///
925    /// Special case: loads memory entries from the store, applies budget
926    /// truncation, then appends the ralph-tools skill content (which covers
927    /// both tasks and memories CLI usage).
928    /// Memory data is gated by `memories.enabled && memories.inject == Auto`.
929    /// The ralph-tools skill is injected when either memories or tasks are enabled.
930    fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
931        let memories_config = &self.config.memories;
932
933        // Inject memory DATA if memories are enabled with auto-inject
934        if memories_config.enabled && memories_config.inject == InjectMode::Auto {
935            info!(
936                "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
937                memories_config.enabled, memories_config.inject, self.config.core.workspace_root
938            );
939
940            let workspace_root = &self.config.core.workspace_root;
941            let store = MarkdownMemoryStore::with_default_path(workspace_root);
942            let memories_path = workspace_root.join(".ralph/agent/memories.md");
943
944            info!(
945                "Looking for memories at: {:?} (exists: {})",
946                memories_path,
947                memories_path.exists()
948            );
949
950            let memories = match store.load() {
951                Ok(memories) => {
952                    info!("Successfully loaded {} memories from store", memories.len());
953                    memories
954                }
955                Err(e) => {
956                    info!(
957                        "Failed to load memories for injection: {} (path: {:?})",
958                        e, memories_path
959                    );
960                    Vec::new()
961                }
962            };
963
964            if memories.is_empty() {
965                info!("Memory store is empty - no memories to inject");
966            } else {
967                let mut memories_content = format_memories_as_markdown(&memories);
968
969                if memories_config.budget > 0 {
970                    let original_len = memories_content.len();
971                    memories_content =
972                        truncate_to_budget(&memories_content, memories_config.budget);
973                    debug!(
974                        "Applied budget: {} chars -> {} chars (budget: {})",
975                        original_len,
976                        memories_content.len(),
977                        memories_config.budget
978                    );
979                }
980
981                info!(
982                    "Injecting {} memories ({} chars) into prompt",
983                    memories.len(),
984                    memories_content.len()
985                );
986
987                prefix.push_str(&memories_content);
988            }
989        }
990
991        // Inject the ralph-tools skill when either memories or tasks are enabled
992        if memories_config.enabled || self.config.tasks.enabled {
993            if let Some(skill) = self.skill_registry.get("ralph-tools") {
994                if !prefix.is_empty() {
995                    prefix.push_str("\n\n");
996                }
997                prefix.push_str(&format!(
998                    "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
999                    skill.content.trim()
1000                ));
1001                debug!("Injected ralph-tools skill from registry");
1002            } else {
1003                debug!("ralph-tools skill not found in registry - skill content not injected");
1004            }
1005        }
1006    }
1007
1008    /// Injects the RObot interaction skill content into the prefix.
1009    ///
1010    /// Gated by `robot.enabled`. Teaches agents how and when to interact
1011    /// with humans via `human.interact` events.
1012    fn inject_robot_skill(&self, prefix: &mut String) {
1013        if !self.config.robot.enabled {
1014            return;
1015        }
1016
1017        if let Some(skill) = self.skill_registry.get("robot-interaction") {
1018            if !prefix.is_empty() {
1019                prefix.push_str("\n\n");
1020            }
1021            prefix.push_str(&format!(
1022                "<robot-skill>\n{}\n</robot-skill>",
1023                skill.content.trim()
1024            ));
1025            debug!("Injected robot interaction skill from registry");
1026        }
1027    }
1028
1029    /// Injects any user-configured auto-inject skills (excluding built-in ralph-tools/robot-interaction).
1030    fn inject_custom_auto_skills(&self, prefix: &mut String) {
1031        for skill in self.skill_registry.auto_inject_skills(None) {
1032            // Skip built-in skills handled above
1033            if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1034                continue;
1035            }
1036
1037            if !prefix.is_empty() {
1038                prefix.push_str("\n\n");
1039            }
1040            prefix.push_str(&format!(
1041                "<{name}-skill>\n{content}\n</{name}-skill>",
1042                name = skill.name,
1043                content = skill.content.trim()
1044            ));
1045            debug!("Injected auto-inject skill: {}", skill.name);
1046        }
1047    }
1048
1049    /// Prepends scratchpad content to the prompt if the file exists and is non-empty.
1050    ///
1051    /// The scratchpad is the agent's working memory for the current objective.
1052    /// Auto-injecting saves one tool call per iteration.
1053    /// When the file exceeds the budget, the TAIL is kept (most recent entries).
1054    fn prepend_scratchpad(&self, prompt: String) -> String {
1055        let scratchpad_path = self.scratchpad_path();
1056
1057        let resolved_path = if scratchpad_path.is_relative() {
1058            self.config.core.workspace_root.join(&scratchpad_path)
1059        } else {
1060            scratchpad_path
1061        };
1062
1063        if !resolved_path.exists() {
1064            debug!(
1065                "Scratchpad not found at {:?}, skipping injection",
1066                resolved_path
1067            );
1068            return prompt;
1069        }
1070
1071        let content = match std::fs::read_to_string(&resolved_path) {
1072            Ok(c) => c,
1073            Err(e) => {
1074                info!("Failed to read scratchpad for injection: {}", e);
1075                return prompt;
1076            }
1077        };
1078
1079        if content.trim().is_empty() {
1080            debug!("Scratchpad is empty, skipping injection");
1081            return prompt;
1082        }
1083
1084        // Budget: 4000 tokens ~16000 chars. Keep the TAIL (most recent content).
1085        let char_budget = 4000 * 4;
1086        let content = if content.len() > char_budget {
1087            // Find a line boundary near the start of the tail (ensure UTF-8 boundary)
1088            let mut start = content.len() - char_budget;
1089            while start < content.len() && !content.is_char_boundary(start) {
1090                start += 1;
1091            }
1092            let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1093            let discarded = &content[..line_start];
1094
1095            // Summarize discarded content by extracting markdown headings
1096            let headings: Vec<&str> = discarded
1097                .lines()
1098                .filter(|line| line.starts_with('#'))
1099                .collect();
1100            let summary = if headings.is_empty() {
1101                format!(
1102                    "<!-- earlier content truncated ({} chars omitted) -->",
1103                    line_start
1104                )
1105            } else {
1106                format!(
1107                    "<!-- earlier content truncated ({} chars omitted) -->\n\
1108                     <!-- discarded sections: {} -->",
1109                    line_start,
1110                    headings.join(" | ")
1111                )
1112            };
1113
1114            format!("{}\n\n{}", summary, &content[line_start..])
1115        } else {
1116            content
1117        };
1118
1119        info!("Injecting scratchpad ({} chars) into prompt", content.len());
1120
1121        let mut final_prompt = format!(
1122            "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1123            self.config.core.scratchpad, content
1124        );
1125        final_prompt.push_str(&prompt);
1126        final_prompt
1127    }
1128
1129    /// Prepends ready tasks to the prompt if tasks are enabled and any exist.
1130    ///
1131    /// Loads the task store and formats ready (unblocked, open) tasks into
1132    /// a `<ready-tasks>` XML block. This saves the agent a tool call per
1133    /// iteration and puts tasks at the same prominence as the scratchpad.
1134    fn prepend_ready_tasks(&self, prompt: String) -> String {
1135        if !self.config.tasks.enabled {
1136            return prompt;
1137        }
1138
1139        use crate::task::TaskStatus;
1140        use crate::task_store::TaskStore;
1141
1142        let tasks_path = self.tasks_path();
1143        let resolved_path = if tasks_path.is_relative() {
1144            self.config.core.workspace_root.join(&tasks_path)
1145        } else {
1146            tasks_path
1147        };
1148
1149        if !resolved_path.exists() {
1150            return prompt;
1151        }
1152
1153        let store = match TaskStore::load(&resolved_path) {
1154            Ok(s) => s,
1155            Err(e) => {
1156                info!("Failed to load task store for injection: {}", e);
1157                return prompt;
1158            }
1159        };
1160
1161        let ready = store.ready();
1162        let open = store.open();
1163        let closed_count = store.all().len() - open.len();
1164
1165        if open.is_empty() && closed_count == 0 {
1166            return prompt;
1167        }
1168
1169        let mut section = String::from("<ready-tasks>\n");
1170        if ready.is_empty() && open.is_empty() {
1171            section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1172        } else {
1173            section.push_str(&format!(
1174                "## Tasks: {} ready, {} open, {} closed\n\n",
1175                ready.len(),
1176                open.len(),
1177                closed_count
1178            ));
1179            for task in &ready {
1180                let status_icon = match task.status {
1181                    TaskStatus::Open => "[ ]",
1182                    TaskStatus::InProgress => "[~]",
1183                    _ => "[?]",
1184                };
1185                section.push_str(&format!(
1186                    "- {} [P{}] {} ({})\n",
1187                    status_icon, task.priority, task.title, task.id
1188                ));
1189            }
1190            // Show blocked tasks separately so agent knows they exist
1191            let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1192            let blocked: Vec<_> = open
1193                .iter()
1194                .filter(|t| !ready_ids.contains(&t.id.as_str()))
1195                .collect();
1196            if !blocked.is_empty() {
1197                section.push_str("\nBlocked:\n");
1198                for task in blocked {
1199                    section.push_str(&format!(
1200                        "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1201                        task.priority,
1202                        task.title,
1203                        task.id,
1204                        task.blocked_by.join(", ")
1205                    ));
1206                }
1207            }
1208        }
1209        section.push_str("</ready-tasks>\n\n");
1210
1211        info!(
1212            "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1213            ready.len(),
1214            open.len(),
1215            closed_count
1216        );
1217
1218        let mut final_prompt = section;
1219        final_prompt.push_str(&prompt);
1220        final_prompt
1221    }
1222
1223    /// Builds the Ralph prompt (coordination mode).
1224    pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1225        self.ralph.build_prompt(prompt_content, &[])
1226    }
1227
1228    /// Determines which hats should be active based on pending events.
1229    /// Returns list of Hat references that are triggered by any pending event.
1230    fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1231        let mut active_hats = Vec::new();
1232        for id in self.determine_active_hat_ids(events) {
1233            if let Some(hat) = self.registry.get(&id) {
1234                active_hats.push(hat);
1235            }
1236        }
1237        active_hats
1238    }
1239
1240    fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1241        let mut active_hat_ids = Vec::new();
1242        for event in events {
1243            if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1244                // Avoid duplicates
1245                if !active_hat_ids.iter().any(|id| id == &hat.id) {
1246                    active_hat_ids.push(hat.id.clone());
1247                }
1248            }
1249        }
1250        active_hat_ids
1251    }
1252
1253    /// Formats an event for prompt context.
1254    ///
1255    /// For top-level prompts (task.start, task.resume), wraps the payload in
1256    /// `<top-level-prompt>` XML tags to clearly delineate the user's original request.
1257    fn format_event(event: &Event) -> String {
1258        let topic = &event.topic;
1259        let payload = &event.payload;
1260
1261        if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1262            format!(
1263                "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1264                topic, payload
1265            )
1266        } else {
1267            format!("Event: {} - {}", topic, payload)
1268        }
1269    }
1270
1271    fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1272        let Some(config) = self.registry.get_config(hat_id) else {
1273            return (false, None);
1274        };
1275        let Some(max) = config.max_activations else {
1276            return (false, None);
1277        };
1278
1279        let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1280        if count < max {
1281            return (false, None);
1282        }
1283
1284        // Emit only once per hat per run (avoid flooding).
1285        let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1286
1287        if !should_emit {
1288            // Hat is already exhausted - drop pending events silently.
1289            return (true, None);
1290        }
1291
1292        let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1293        dropped_topics.sort();
1294
1295        let payload = format!(
1296            "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n  - {topics}",
1297            hat = hat_id.as_str(),
1298            max = max,
1299            count = count,
1300            topics = dropped_topics.join("\n  - ")
1301        );
1302
1303        warn!(
1304            hat = %hat_id.as_str(),
1305            max_activations = max,
1306            activations = count,
1307            "Hat exhausted (max_activations reached)"
1308        );
1309
1310        (
1311            true,
1312            Some(Event::new(
1313                format!("{}.exhausted", hat_id.as_str()),
1314                payload,
1315            )),
1316        )
1317    }
1318
1319    fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1320        for hat_id in active_hat_ids {
1321            *self
1322                .state
1323                .hat_activation_counts
1324                .entry(hat_id.clone())
1325                .or_insert(0) += 1;
1326        }
1327    }
1328
1329    /// Returns the primary active hat ID for display purposes.
1330    /// Returns the first active hat, or "ralph" if no specific hat is active.
1331    pub fn get_active_hat_id(&self) -> HatId {
1332        // Peek at pending events (don't consume them)
1333        for hat_id in self.bus.hat_ids() {
1334            let Some(events) = self.bus.peek_pending(hat_id) else {
1335                continue;
1336            };
1337            let Some(event) = events.first() else {
1338                continue;
1339            };
1340            if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1341                return active_hat.id.clone();
1342            }
1343        }
1344        HatId::new("ralph")
1345    }
1346
1347    /// Records the current event count before hat execution.
1348    ///
1349    /// Call this before executing a hat, then use `check_default_publishes`
1350    /// after execution to inject a fallback event if needed.
1351    pub fn record_event_count(&mut self) -> usize {
1352        self.event_reader
1353            .read_new_events()
1354            .map(|r| r.events.len())
1355            .unwrap_or(0)
1356    }
1357
1358    /// Checks if hat wrote any events, and injects default if configured.
1359    ///
1360    /// Call this after hat execution with the count from `record_event_count`.
1361    /// If no new events were written AND the hat has `default_publishes` configured,
1362    /// this will inject the default event automatically.
1363    pub fn check_default_publishes(&mut self, hat_id: &HatId, _events_before: usize) {
1364        let events_after = self
1365            .event_reader
1366            .read_new_events()
1367            .map(|r| r.events.len())
1368            .unwrap_or(0);
1369
1370        if events_after == 0
1371            && let Some(config) = self.registry.get_config(hat_id)
1372            && let Some(default_topic) = &config.default_publishes
1373        {
1374            // No new events written - inject default event
1375            let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1376
1377            debug!(
1378                hat = %hat_id.as_str(),
1379                topic = %default_topic,
1380                "No events written by hat, injecting default_publishes event"
1381            );
1382
1383            self.bus.publish(default_event);
1384        }
1385    }
1386
1387    /// Returns a mutable reference to the event bus for direct event publishing.
1388    ///
1389    /// This is primarily used for planning sessions to inject user responses
1390    /// as events into the orchestration loop.
1391    pub fn bus(&mut self) -> &mut EventBus {
1392        &mut self.bus
1393    }
1394
1395    /// Processes output from a hat execution.
1396    ///
1397    /// Returns the termination reason if the loop should stop.
1398    pub fn process_output(
1399        &mut self,
1400        hat_id: &HatId,
1401        output: &str,
1402        success: bool,
1403    ) -> Option<TerminationReason> {
1404        self.state.iteration += 1;
1405        self.state.last_hat = Some(hat_id.clone());
1406
1407        // Periodic robot check-in
1408        if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1409            && let Some(ref robot_service) = self.robot_service
1410        {
1411            let elapsed = self.state.elapsed();
1412            let interval = std::time::Duration::from_secs(interval_secs);
1413            let last = self
1414                .state
1415                .last_checkin_at
1416                .map(|t| t.elapsed())
1417                .unwrap_or(elapsed);
1418
1419            if last >= interval {
1420                let context = self.build_checkin_context(hat_id);
1421                match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1422                    Ok(_) => {
1423                        self.state.last_checkin_at = Some(std::time::Instant::now());
1424                        debug!(iteration = self.state.iteration, "Sent robot check-in");
1425                    }
1426                    Err(e) => {
1427                        warn!(error = %e, "Failed to send robot check-in");
1428                    }
1429                }
1430            }
1431        }
1432
1433        // Log iteration started
1434        self.diagnostics.log_orchestration(
1435            self.state.iteration,
1436            "loop",
1437            crate::diagnostics::OrchestrationEvent::IterationStarted,
1438        );
1439
1440        // Log hat selected
1441        self.diagnostics.log_orchestration(
1442            self.state.iteration,
1443            "loop",
1444            crate::diagnostics::OrchestrationEvent::HatSelected {
1445                hat: hat_id.to_string(),
1446                reason: "process_output".to_string(),
1447            },
1448        );
1449
1450        // Track failures
1451        if success {
1452            self.state.consecutive_failures = 0;
1453        } else {
1454            self.state.consecutive_failures += 1;
1455        }
1456
1457        let _ = output;
1458
1459        // Events are ONLY read from the JSONL file written by `ralph emit`.
1460        // This enforces tool use and prevents confabulation (agent claiming to emit without actually doing so).
1461        // See process_events_from_jsonl() for event processing.
1462
1463        // Check termination conditions
1464        self.check_termination()
1465    }
1466
1467    /// Extracts task identifier from build.blocked payload.
1468    /// Uses first line of payload as task ID.
1469    fn extract_task_id(payload: &str) -> String {
1470        payload
1471            .lines()
1472            .next()
1473            .unwrap_or("unknown")
1474            .trim()
1475            .to_string()
1476    }
1477
1478    /// Adds cost to the cumulative total.
1479    pub fn add_cost(&mut self, cost: f64) {
1480        self.state.cumulative_cost += cost;
1481    }
1482
1483    /// Verifies all tasks in scratchpad are complete or cancelled.
1484    ///
1485    /// Returns:
1486    /// - `Ok(true)` if all tasks are `[x]` or `[~]`
1487    /// - `Ok(false)` if any tasks are `[ ]` (pending)
1488    /// - `Err(...)` if scratchpad doesn't exist or can't be read
1489    fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1490        let scratchpad_path = self.scratchpad_path();
1491
1492        if !scratchpad_path.exists() {
1493            return Err(std::io::Error::new(
1494                std::io::ErrorKind::NotFound,
1495                "Scratchpad does not exist",
1496            ));
1497        }
1498
1499        let content = std::fs::read_to_string(scratchpad_path)?;
1500
1501        let has_pending = content
1502            .lines()
1503            .any(|line| line.trim_start().starts_with("- [ ]"));
1504
1505        Ok(!has_pending)
1506    }
1507
1508    fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1509        use crate::task_store::TaskStore;
1510
1511        let tasks_path = self.tasks_path();
1512
1513        // No tasks file = no pending tasks = complete
1514        if !tasks_path.exists() {
1515            return Ok(true);
1516        }
1517
1518        let store = TaskStore::load(&tasks_path)?;
1519        Ok(!store.has_pending_tasks())
1520    }
1521
1522    /// Builds a [`CheckinContext`] with current loop state for robot check-ins.
1523    fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
1524        let (open_tasks, closed_tasks) = self.count_tasks();
1525        CheckinContext {
1526            current_hat: Some(hat_id.as_str().to_string()),
1527            open_tasks,
1528            closed_tasks,
1529            cumulative_cost: self.state.cumulative_cost,
1530        }
1531    }
1532
1533    /// Counts open and closed tasks from the task store.
1534    ///
1535    /// Returns `(open_count, closed_count)`. "Open" means non-terminal tasks,
1536    /// "closed" means tasks with `TaskStatus::Closed`.
1537    fn count_tasks(&self) -> (usize, usize) {
1538        use crate::task::TaskStatus;
1539        use crate::task_store::TaskStore;
1540
1541        let tasks_path = self.tasks_path();
1542        if !tasks_path.exists() {
1543            return (0, 0);
1544        }
1545
1546        match TaskStore::load(&tasks_path) {
1547            Ok(store) => {
1548                let total = store.all().len();
1549                let open = store.open().len();
1550                let closed = total - open;
1551                // Verify: closed should match Closed status count
1552                debug_assert_eq!(
1553                    closed,
1554                    store
1555                        .all()
1556                        .iter()
1557                        .filter(|t| t.status == TaskStatus::Closed)
1558                        .count()
1559                );
1560                (open, closed)
1561            }
1562            Err(_) => (0, 0),
1563        }
1564    }
1565
1566    /// Returns a list of open task descriptions for logging purposes.
1567    fn get_open_task_list(&self) -> Vec<String> {
1568        use crate::task_store::TaskStore;
1569
1570        let tasks_path = self.tasks_path();
1571        if let Ok(store) = TaskStore::load(&tasks_path) {
1572            return store
1573                .open()
1574                .iter()
1575                .map(|t| format!("{}: {}", t.id, t.title))
1576                .collect();
1577        }
1578        vec![]
1579    }
1580
1581    fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
1582        let threshold = self.config.event_loop.mutation_score_warn_threshold;
1583
1584        match &evidence.mutants {
1585            Some(mutants) => {
1586                if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
1587                    warn!(
1588                        reason = %reason,
1589                        mutants_status = ?mutants.status,
1590                        mutants_score = mutants.score_percent,
1591                        mutants_threshold = threshold,
1592                        "Mutation testing warning"
1593                    );
1594                }
1595            }
1596            None => {
1597                if let Some(threshold) = threshold {
1598                    warn!(
1599                        mutants_threshold = threshold,
1600                        "Mutation testing warning: missing mutation evidence in build.done payload"
1601                    );
1602                }
1603            }
1604        }
1605    }
1606
1607    fn mutation_warning_reason(
1608        mutants: &MutationEvidence,
1609        threshold: Option<f64>,
1610    ) -> Option<String> {
1611        match mutants.status {
1612            MutationStatus::Fail => Some("mutation testing failed".to_string()),
1613            MutationStatus::Warn => Some(Self::format_mutation_message(
1614                "mutation score below threshold",
1615                mutants.score_percent,
1616            )),
1617            MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
1618            MutationStatus::Pass => {
1619                let threshold = threshold?;
1620
1621                match mutants.score_percent {
1622                    Some(score) if score < threshold => Some(format!(
1623                        "mutation score {:.2}% below threshold {:.2}%",
1624                        score, threshold
1625                    )),
1626                    Some(_) => None,
1627                    None => Some(format!(
1628                        "mutation score missing (threshold {:.2}%)",
1629                        threshold
1630                    )),
1631                }
1632            }
1633        }
1634    }
1635
1636    fn format_mutation_message(message: &str, score: Option<f64>) -> String {
1637        match score {
1638            Some(score) => format!("{message} ({score:.2}%)"),
1639            None => message.to_string(),
1640        }
1641    }
1642
1643    /// Processes events from JSONL and routes orphaned events to Ralph.
1644    ///
1645    /// Also handles backpressure for malformed JSONL lines by:
1646    /// 1. Emitting `event.malformed` system events for each parse failure
1647    /// 2. Tracking consecutive failures for termination check
1648    /// 3. Resetting counter when valid events are parsed
1649    ///
1650    /// Returns true if Ralph should be invoked to handle orphaned events.
1651    pub fn process_events_from_jsonl(&mut self) -> std::io::Result<bool> {
1652        let result = self.event_reader.read_new_events()?;
1653
1654        // Handle malformed lines with backpressure
1655        for malformed in &result.malformed {
1656            let payload = format!(
1657                "Line {}: {}\nContent: {}",
1658                malformed.line_number, malformed.error, &malformed.content
1659            );
1660            let event = Event::new("event.malformed", &payload);
1661            self.bus.publish(event);
1662            self.state.consecutive_malformed_events += 1;
1663            warn!(
1664                line = malformed.line_number,
1665                consecutive = self.state.consecutive_malformed_events,
1666                "Malformed event line detected"
1667            );
1668        }
1669
1670        // Reset counter when valid events are parsed
1671        if !result.events.is_empty() {
1672            self.state.consecutive_malformed_events = 0;
1673        }
1674
1675        if result.events.is_empty() && result.malformed.is_empty() {
1676            return Ok(false);
1677        }
1678
1679        let mut has_orphans = false;
1680
1681        // Validate and transform events (apply backpressure for build.done)
1682        let mut validated_events = Vec::new();
1683        let completion_topic = self.config.event_loop.completion_promise.as_str();
1684        let total_events = result.events.len();
1685        for (index, event) in result.events.into_iter().enumerate() {
1686            let payload = event.payload.clone().unwrap_or_default();
1687
1688            if event.topic == completion_topic {
1689                if index + 1 == total_events {
1690                    self.state.completion_requested = true;
1691                    self.diagnostics.log_orchestration(
1692                        self.state.iteration,
1693                        "jsonl",
1694                        crate::diagnostics::OrchestrationEvent::EventPublished {
1695                            topic: event.topic.clone(),
1696                        },
1697                    );
1698                    info!(
1699                        topic = %event.topic,
1700                        "Completion event detected in JSONL"
1701                    );
1702                } else {
1703                    warn!(
1704                        topic = %event.topic,
1705                        index = index,
1706                        total_events = total_events,
1707                        "Completion event ignored because it was not the last event"
1708                    );
1709                }
1710                continue;
1711            }
1712
1713            if event.topic == "build.done" {
1714                // Validate build.done events have backpressure evidence
1715                if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1716                    if evidence.all_passed() {
1717                        self.warn_on_mutation_evidence(&evidence);
1718                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1719                    } else {
1720                        // Evidence present but checks failed - synthesize build.blocked
1721                        warn!(
1722                            tests = evidence.tests_passed,
1723                            lint = evidence.lint_passed,
1724                            typecheck = evidence.typecheck_passed,
1725                            audit = evidence.audit_passed,
1726                            coverage = evidence.coverage_passed,
1727                            complexity = evidence.complexity_score,
1728                            duplication = evidence.duplication_passed,
1729                            performance = evidence.performance_regression,
1730                            specs = evidence.specs_verified,
1731                            "build.done rejected: backpressure checks failed"
1732                        );
1733
1734                        let complexity = evidence
1735                            .complexity_score
1736                            .map(|value| format!("{value:.2}"))
1737                            .unwrap_or_else(|| "missing".to_string());
1738                        let performance = match evidence.performance_regression {
1739                            Some(true) => "regression".to_string(),
1740                            Some(false) => "pass".to_string(),
1741                            None => "missing".to_string(),
1742                        };
1743                        let specs = match evidence.specs_verified {
1744                            Some(true) => "pass".to_string(),
1745                            Some(false) => "fail".to_string(),
1746                            None => "not reported".to_string(),
1747                        };
1748
1749                        self.diagnostics.log_orchestration(
1750                            self.state.iteration,
1751                            "jsonl",
1752                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1753                                reason: format!(
1754                                    "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
1755                                    evidence.tests_passed,
1756                                    evidence.lint_passed,
1757                                    evidence.typecheck_passed,
1758                                    evidence.audit_passed,
1759                                    evidence.coverage_passed,
1760                                    complexity,
1761                                    evidence.duplication_passed,
1762                                    performance,
1763                                    specs
1764                                ),
1765                            },
1766                        );
1767
1768                        validated_events.push(Event::new(
1769                            "build.blocked",
1770                            "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
1771                        ));
1772                    }
1773                } else {
1774                    // No evidence found - synthesize build.blocked
1775                    warn!("build.done rejected: missing backpressure evidence");
1776
1777                    self.diagnostics.log_orchestration(
1778                        self.state.iteration,
1779                        "jsonl",
1780                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1781                            reason: "missing backpressure evidence".to_string(),
1782                        },
1783                    );
1784
1785                    validated_events.push(Event::new(
1786                        "build.blocked",
1787                        "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
1788                    ));
1789                }
1790            } else if event.topic == "review.done" {
1791                // Validate review.done events have verification evidence
1792                if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
1793                    if evidence.is_verified() {
1794                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1795                    } else {
1796                        // Evidence present but checks failed - synthesize review.blocked
1797                        warn!(
1798                            tests = evidence.tests_passed,
1799                            build = evidence.build_passed,
1800                            "review.done rejected: verification checks failed"
1801                        );
1802
1803                        self.diagnostics.log_orchestration(
1804                            self.state.iteration,
1805                            "jsonl",
1806                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1807                                reason: format!(
1808                                    "review verification failed: tests={}, build={}",
1809                                    evidence.tests_passed, evidence.build_passed
1810                                ),
1811                            },
1812                        );
1813
1814                        validated_events.push(Event::new(
1815                            "review.blocked",
1816                            "Review verification failed. Run tests and build before emitting review.done.",
1817                        ));
1818                    }
1819                } else {
1820                    // No evidence found - synthesize review.blocked
1821                    warn!("review.done rejected: missing verification evidence");
1822
1823                    self.diagnostics.log_orchestration(
1824                        self.state.iteration,
1825                        "jsonl",
1826                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1827                            reason: "missing review verification evidence".to_string(),
1828                        },
1829                    );
1830
1831                    validated_events.push(Event::new(
1832                        "review.blocked",
1833                        "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
1834                    ));
1835                }
1836            } else if event.topic == "verify.passed" {
1837                if let Some(report) = EventParser::parse_quality_report(&payload) {
1838                    if report.meets_thresholds() {
1839                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1840                    } else {
1841                        let failed = report.failed_dimensions();
1842                        let reason = if failed.is_empty() {
1843                            "quality thresholds failed".to_string()
1844                        } else {
1845                            format!("quality thresholds failed: {}", failed.join(", "))
1846                        };
1847
1848                        warn!(
1849                            failed_dimensions = ?failed,
1850                            "verify.passed rejected: quality thresholds failed"
1851                        );
1852
1853                        self.diagnostics.log_orchestration(
1854                            self.state.iteration,
1855                            "jsonl",
1856                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1857                                reason,
1858                            },
1859                        );
1860
1861                        validated_events.push(Event::new(
1862                            "verify.failed",
1863                            "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
1864                        ));
1865                    }
1866                } else {
1867                    // No quality report found - synthesize verify.failed
1868                    warn!("verify.passed rejected: missing quality report");
1869
1870                    self.diagnostics.log_orchestration(
1871                        self.state.iteration,
1872                        "jsonl",
1873                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1874                            reason: "missing quality report".to_string(),
1875                        },
1876                    );
1877
1878                    validated_events.push(Event::new(
1879                        "verify.failed",
1880                        "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
1881                    ));
1882                }
1883            } else if event.topic == "verify.failed" {
1884                if EventParser::parse_quality_report(&payload).is_none() {
1885                    warn!("verify.failed missing quality report");
1886                }
1887                validated_events.push(Event::new(event.topic.as_str(), &payload));
1888            } else {
1889                // Non-backpressure events pass through unchanged
1890                validated_events.push(Event::new(event.topic.as_str(), &payload));
1891            }
1892        }
1893
1894        // Track build.blocked events for thrashing detection
1895        let blocked_events: Vec<_> = validated_events
1896            .iter()
1897            .filter(|e| e.topic == "build.blocked".into())
1898            .collect();
1899
1900        for blocked_event in &blocked_events {
1901            let task_id = Self::extract_task_id(&blocked_event.payload);
1902
1903            let count = self
1904                .state
1905                .task_block_counts
1906                .entry(task_id.clone())
1907                .or_insert(0);
1908            *count += 1;
1909
1910            debug!(
1911                task_id = %task_id,
1912                block_count = *count,
1913                "Task blocked"
1914            );
1915
1916            // After 3 blocks on same task, emit build.task.abandoned
1917            if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
1918                warn!(
1919                    task_id = %task_id,
1920                    "Task abandoned after 3 consecutive blocks"
1921                );
1922
1923                self.state.abandoned_tasks.push(task_id.clone());
1924
1925                self.diagnostics.log_orchestration(
1926                    self.state.iteration,
1927                    "jsonl",
1928                    crate::diagnostics::OrchestrationEvent::TaskAbandoned {
1929                        reason: format!(
1930                            "3 consecutive build.blocked events for task '{}'",
1931                            task_id
1932                        ),
1933                    },
1934                );
1935
1936                let abandoned_event = Event::new(
1937                    "build.task.abandoned",
1938                    format!(
1939                        "Task '{}' abandoned after 3 consecutive build.blocked events",
1940                        task_id
1941                    ),
1942                );
1943
1944                self.bus.publish(abandoned_event);
1945            }
1946        }
1947
1948        // Track hat-level blocking for legacy thrashing detection
1949        let has_blocked_event = !blocked_events.is_empty();
1950
1951        if has_blocked_event {
1952            self.state.consecutive_blocked += 1;
1953        } else {
1954            self.state.consecutive_blocked = 0;
1955            self.state.last_blocked_hat = None;
1956        }
1957
1958        // Handle human.interact blocking behavior:
1959        // When a human.interact event is detected and robot service is active,
1960        // send the question and block until human.response or timeout.
1961        let mut response_event = None;
1962        let ask_human_idx = validated_events
1963            .iter()
1964            .position(|e| e.topic == "human.interact".into());
1965
1966        if let Some(idx) = ask_human_idx {
1967            let ask_event = &validated_events[idx];
1968            let payload = ask_event.payload.clone();
1969
1970            if let Some(ref robot_service) = self.robot_service {
1971                info!(
1972                    payload = %payload,
1973                    "human.interact event detected — sending question via robot service"
1974                );
1975
1976                // Send the question (includes retry with exponential backoff)
1977                let send_ok = match robot_service.send_question(&payload) {
1978                    Ok(_message_id) => true,
1979                    Err(e) => {
1980                        warn!(
1981                            error = %e,
1982                            "Failed to send human.interact question after retries — treating as timeout"
1983                        );
1984                        // Log to diagnostics
1985                        self.diagnostics.log_error(
1986                            self.state.iteration,
1987                            "telegram",
1988                            crate::diagnostics::DiagnosticError::TelegramSendError {
1989                                operation: "send_question".to_string(),
1990                                error: e.to_string(),
1991                                retry_count: 3,
1992                            },
1993                        );
1994                        false
1995                    }
1996                };
1997
1998                // Block: poll events file for human.response
1999                // Per spec, even on send failure we treat as timeout (continue without blocking)
2000                if send_ok {
2001                    // Read the active events path from the current-events marker,
2002                    // falling back to the default events.jsonl if not available.
2003                    let events_path = self
2004                        .loop_context
2005                        .as_ref()
2006                        .and_then(|ctx| {
2007                            std::fs::read_to_string(ctx.current_events_marker())
2008                                .ok()
2009                                .map(|s| ctx.workspace().join(s.trim()))
2010                        })
2011                        .or_else(|| {
2012                            std::fs::read_to_string(".ralph/current-events")
2013                                .ok()
2014                                .map(|s| PathBuf::from(s.trim()))
2015                        })
2016                        .unwrap_or_else(|| {
2017                            self.loop_context
2018                                .as_ref()
2019                                .map(|ctx| ctx.events_path())
2020                                .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2021                        });
2022
2023                    match robot_service.wait_for_response(&events_path) {
2024                        Ok(Some(response)) => {
2025                            info!(
2026                                response = %response,
2027                                "Received human.response — continuing loop"
2028                            );
2029                            // Create a human.response event to inject into the bus
2030                            response_event = Some(Event::new("human.response", &response));
2031                        }
2032                        Ok(None) => {
2033                            warn!(
2034                                timeout_secs = robot_service.timeout_secs(),
2035                                "Human response timeout — continuing without response"
2036                            );
2037                        }
2038                        Err(e) => {
2039                            warn!(
2040                                error = %e,
2041                                "Error waiting for human response — continuing without response"
2042                            );
2043                        }
2044                    }
2045                }
2046            } else {
2047                debug!(
2048                    "human.interact event detected but no robot service active — passing through"
2049                );
2050            }
2051        }
2052
2053        // Publish validated events to the bus.
2054        // Ralph is always registered with subscribe("*"), so every event has at least
2055        // one subscriber. Events without a specific hat subscriber are "orphaned" —
2056        // Ralph handles them as the universal fallback.
2057        for event in validated_events {
2058            self.diagnostics.log_orchestration(
2059                self.state.iteration,
2060                "jsonl",
2061                crate::diagnostics::OrchestrationEvent::EventPublished {
2062                    topic: event.topic.to_string(),
2063                },
2064            );
2065
2066            if !self.registry.has_subscriber(event.topic.as_str()) {
2067                has_orphans = true;
2068            }
2069
2070            debug!(
2071                topic = %event.topic,
2072                "Publishing event from JSONL"
2073            );
2074            self.bus.publish(event);
2075        }
2076
2077        // Publish human.response event if one was received during blocking
2078        if let Some(response) = response_event {
2079            info!(
2080                topic = %response.topic,
2081                "Publishing human.response event from robot service"
2082            );
2083            self.bus.publish(response);
2084        }
2085
2086        Ok(has_orphans)
2087    }
2088
2089    /// Checks if output contains a completion event from Ralph.
2090    ///
2091    /// Completion must be emitted as an `<event>` tag, not plain text.
2092    pub fn check_ralph_completion(&self, output: &str) -> bool {
2093        let events = EventParser::new().parse(output);
2094        events
2095            .iter()
2096            .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2097    }
2098
2099    /// Publishes the loop.terminate system event to observers.
2100    ///
2101    /// Per spec: "Published by the orchestrator (not agents) when the loop exits."
2102    /// This is an observer-only event—hats cannot trigger on it.
2103    ///
2104    /// Returns the event for logging purposes.
2105    pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2106        // Stop the robot service if it was running
2107        self.stop_robot_service();
2108
2109        let elapsed = self.state.elapsed();
2110        let duration_str = format_duration(elapsed);
2111
2112        let payload = format!(
2113            "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2114            reason.as_str(),
2115            termination_status_text(reason),
2116            self.state.iteration,
2117            duration_str,
2118            reason.exit_code()
2119        );
2120
2121        let event = Event::new("loop.terminate", &payload);
2122
2123        // Publish to bus for observers (but no hat can trigger on this)
2124        self.bus.publish(event.clone());
2125
2126        info!(
2127            reason = %reason.as_str(),
2128            iterations = self.state.iteration,
2129            duration = %duration_str,
2130            "Wrapping up: {}. {} iterations in {}.",
2131            reason.as_str(),
2132            self.state.iteration,
2133            duration_str
2134        );
2135
2136        event
2137    }
2138
2139    /// Returns the robot service's shutdown flag, if active.
2140    ///
2141    /// Signal handlers can set this flag to interrupt `wait_for_response()`
2142    /// without waiting for the full timeout.
2143    pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2144        self.robot_service.as_ref().map(|s| s.shutdown_flag())
2145    }
2146
2147    /// Stops the robot service if it's running.
2148    ///
2149    /// Called during loop termination to cleanly shut down the communication backend.
2150    fn stop_robot_service(&mut self) {
2151        if let Some(service) = self.robot_service.take() {
2152            service.stop();
2153        }
2154    }
2155
2156    // -------------------------------------------------------------------------
2157    // Human-in-the-loop planning support
2158    // -------------------------------------------------------------------------
2159
2160    /// Check if any event is a `user.prompt` event.
2161    ///
2162    /// Returns the first user prompt event found, or None.
2163    pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2164        events
2165            .iter()
2166            .find(|e| e.topic.as_str() == "user.prompt")
2167            .map(|e| UserPrompt {
2168                id: Self::extract_prompt_id(&e.payload),
2169                text: e.payload.clone(),
2170            })
2171    }
2172
2173    /// Extract a prompt ID from the event payload.
2174    ///
2175    /// Supports both XML attribute format: `<event topic="user.prompt" id="q1">...</event>`
2176    /// and JSON format in payload.
2177    fn extract_prompt_id(payload: &str) -> String {
2178        // Try to extract id attribute from XML-like format first
2179        if let Some(start) = payload.find("id=\"")
2180            && let Some(end) = payload[start + 4..].find('"')
2181        {
2182            return payload[start + 4..start + 4 + end].to_string();
2183        }
2184
2185        // Fallback: generate a simple ID based on timestamp
2186        format!("q{}", Self::generate_prompt_id())
2187    }
2188
2189    /// Generate a simple unique ID for prompts.
2190    /// Uses timestamp-based generation since uuid crate isn't available.
2191    fn generate_prompt_id() -> String {
2192        use std::time::{SystemTime, UNIX_EPOCH};
2193        let nanos = SystemTime::now()
2194            .duration_since(UNIX_EPOCH)
2195            .unwrap()
2196            .as_nanos();
2197        format!("{:x}", nanos % 0xFFFF_FFFF)
2198    }
2199}
2200
2201/// A user prompt that requires human input.
2202///
2203/// Created when the agent emits a `user.prompt` event during planning.
2204#[derive(Debug, Clone)]
2205pub struct UserPrompt {
2206    /// Unique identifier for this prompt (e.g., "q1", "q2")
2207    pub id: String,
2208    /// The prompt/question text
2209    pub text: String,
2210}
2211
2212/// Formats a duration as human-readable string.
2213fn format_duration(d: Duration) -> String {
2214    let total_secs = d.as_secs();
2215    let hours = total_secs / 3600;
2216    let minutes = (total_secs % 3600) / 60;
2217    let seconds = total_secs % 60;
2218
2219    if hours > 0 {
2220        format!("{}h {}m {}s", hours, minutes, seconds)
2221    } else if minutes > 0 {
2222        format!("{}m {}s", minutes, seconds)
2223    } else {
2224        format!("{}s", seconds)
2225    }
2226}
2227
2228/// Returns a human-readable status based on termination reason.
2229fn termination_status_text(reason: &TerminationReason) -> &'static str {
2230    match reason {
2231        TerminationReason::CompletionPromise => "All tasks completed successfully.",
2232        TerminationReason::MaxIterations => "Stopped at iteration limit.",
2233        TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2234        TerminationReason::MaxCost => "Stopped at cost limit.",
2235        TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2236        TerminationReason::LoopThrashing => {
2237            "Loop thrashing detected - same hat repeatedly blocked."
2238        }
2239        TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2240        TerminationReason::Stopped => "Manually stopped.",
2241        TerminationReason::Interrupted => "Interrupted by signal.",
2242        TerminationReason::RestartRequested => "Restarting by human request.",
2243    }
2244}