Skip to main content

ralph_core/event_loop/
mod.rs

1//! Event loop orchestration.
2//!
3//! The event loop coordinates the execution of hats via pub/sub messaging.
4
5mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use crate::text::floor_char_boundary;
21use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::sync::atomic::AtomicBool;
25use std::time::Duration;
26use tracing::{debug, info, warn};
27
28/// Reason the event loop terminated.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum TerminationReason {
31    /// Completion promise was detected in output.
32    CompletionPromise,
33    /// Maximum iterations reached.
34    MaxIterations,
35    /// Maximum runtime exceeded.
36    MaxRuntime,
37    /// Maximum cost exceeded.
38    MaxCost,
39    /// Too many consecutive failures.
40    ConsecutiveFailures,
41    /// Loop thrashing detected (repeated blocked events).
42    LoopThrashing,
43    /// Too many consecutive malformed JSONL lines in events file.
44    ValidationFailure,
45    /// Manually stopped.
46    Stopped,
47    /// Interrupted by signal (SIGINT/SIGTERM).
48    Interrupted,
49    /// Restart requested via Telegram `/restart` command.
50    RestartRequested,
51}
52
53impl TerminationReason {
54    /// Returns the exit code for this termination reason per spec.
55    ///
56    /// Per spec "Loop Termination" section:
57    /// - 0: Completion promise detected (success)
58    /// - 1: Consecutive failures or unrecoverable error (failure)
59    /// - 2: Max iterations, max runtime, or max cost exceeded (limit)
60    /// - 130: User interrupt (SIGINT = 128 + 2)
61    pub fn exit_code(&self) -> i32 {
62        match self {
63            TerminationReason::CompletionPromise => 0,
64            TerminationReason::ConsecutiveFailures
65            | TerminationReason::LoopThrashing
66            | TerminationReason::ValidationFailure
67            | TerminationReason::Stopped => 1,
68            TerminationReason::MaxIterations
69            | TerminationReason::MaxRuntime
70            | TerminationReason::MaxCost => 2,
71            TerminationReason::Interrupted => 130,
72            // Restart uses exit code 3 to signal the caller to exec-replace
73            TerminationReason::RestartRequested => 3,
74        }
75    }
76
77    /// Returns the reason string for use in loop.terminate event payload.
78    ///
79    /// Per spec event payload format:
80    /// `completed | max_iterations | max_runtime | consecutive_failures | interrupted | error`
81    pub fn as_str(&self) -> &'static str {
82        match self {
83            TerminationReason::CompletionPromise => "completed",
84            TerminationReason::MaxIterations => "max_iterations",
85            TerminationReason::MaxRuntime => "max_runtime",
86            TerminationReason::MaxCost => "max_cost",
87            TerminationReason::ConsecutiveFailures => "consecutive_failures",
88            TerminationReason::LoopThrashing => "loop_thrashing",
89            TerminationReason::ValidationFailure => "validation_failure",
90            TerminationReason::Stopped => "stopped",
91            TerminationReason::Interrupted => "interrupted",
92            TerminationReason::RestartRequested => "restart_requested",
93        }
94    }
95
96    /// Returns true if this is a successful completion (not an error or limit).
97    pub fn is_success(&self) -> bool {
98        matches!(self, TerminationReason::CompletionPromise)
99    }
100}
101
102/// The main event loop orchestrator.
103pub struct EventLoop {
104    config: RalphConfig,
105    registry: HatRegistry,
106    bus: EventBus,
107    state: LoopState,
108    instruction_builder: InstructionBuilder,
109    ralph: HatlessRalph,
110    /// Cached human guidance messages that should persist across iterations.
111    robot_guidance: Vec<String>,
112    /// Event reader for consuming events from JSONL file.
113    /// Made pub(crate) to allow tests to override the path.
114    pub(crate) event_reader: EventReader,
115    diagnostics: crate::diagnostics::DiagnosticsCollector,
116    /// Loop context for path resolution (None for legacy single-loop mode).
117    loop_context: Option<LoopContext>,
118    /// Skill registry for the current loop.
119    skill_registry: SkillRegistry,
120    /// Robot service for human-in-the-loop communication.
121    /// Injected externally when `human.enabled` is true and this is the primary loop.
122    robot_service: Option<Box<dyn RobotService>>,
123}
124
125impl EventLoop {
126    /// Creates a new event loop from configuration.
127    pub fn new(config: RalphConfig) -> Self {
128        // Try to create diagnostics collector, but fall back to disabled if it fails
129        // (e.g., in tests without proper directory setup)
130        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
131            .unwrap_or_else(|e| {
132                debug!(
133                    "Failed to initialize diagnostics: {}, using disabled collector",
134                    e
135                );
136                crate::diagnostics::DiagnosticsCollector::disabled()
137            });
138
139        Self::with_diagnostics(config, diagnostics)
140    }
141
142    /// Creates a new event loop with a loop context for path resolution.
143    ///
144    /// The loop context determines where events, tasks, and other state files
145    /// are located. Use this for multi-loop scenarios where each loop runs
146    /// in an isolated workspace (git worktree).
147    pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
148        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
149            .unwrap_or_else(|e| {
150                debug!(
151                    "Failed to initialize diagnostics: {}, using disabled collector",
152                    e
153                );
154                crate::diagnostics::DiagnosticsCollector::disabled()
155            });
156
157        Self::with_context_and_diagnostics(config, context, diagnostics)
158    }
159
160    /// Creates a new event loop with explicit loop context and diagnostics.
161    pub fn with_context_and_diagnostics(
162        config: RalphConfig,
163        context: LoopContext,
164        diagnostics: crate::diagnostics::DiagnosticsCollector,
165    ) -> Self {
166        let registry = HatRegistry::from_config(&config);
167        let instruction_builder =
168            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
169
170        let mut bus = EventBus::new();
171
172        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
173        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
174        // Custom hats are registered first (higher priority), Ralph catches everything else.
175        for hat in registry.all() {
176            bus.register(hat.clone());
177        }
178
179        // Always register Ralph as catch-all coordinator
180        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
181        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
182        bus.register(ralph_hat);
183
184        if registry.is_empty() {
185            debug!("Solo mode: Ralph is the only coordinator");
186        } else {
187            debug!(
188                "Multi-hat mode: {} custom hats + Ralph as fallback",
189                registry.len()
190            );
191        }
192
193        // Build skill registry from config
194        let skill_registry = if config.skills.enabled {
195            SkillRegistry::from_config(
196                &config.skills,
197                context.workspace(),
198                Some(config.cli.backend.as_str()),
199            )
200            .unwrap_or_else(|e| {
201                warn!(
202                    "Failed to build skill registry: {}, using empty registry",
203                    e
204                );
205                SkillRegistry::new(Some(config.cli.backend.as_str()))
206            })
207        } else {
208            SkillRegistry::new(Some(config.cli.backend.as_str()))
209        };
210
211        let skill_index = if config.skills.enabled {
212            skill_registry.build_index(None)
213        } else {
214            String::new()
215        };
216
217        // When memories are enabled, add tasks CLI instructions alongside scratchpad
218        let ralph = HatlessRalph::new(
219            config.event_loop.completion_promise.clone(),
220            config.core.clone(),
221            &registry,
222            config.event_loop.starting_event.clone(),
223        )
224        .with_memories_enabled(config.memories.enabled)
225        .with_skill_index(skill_index);
226
227        // Read timestamped events path from marker file, fall back to default
228        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
229        // which we resolve relative to the workspace root
230        let events_path = std::fs::read_to_string(context.current_events_marker())
231            .map(|s| {
232                let relative = s.trim();
233                context.workspace().join(relative)
234            })
235            .unwrap_or_else(|_| context.events_path());
236        let event_reader = EventReader::new(&events_path);
237
238        Self {
239            config,
240            registry,
241            bus,
242            state: LoopState::new(),
243            instruction_builder,
244            ralph,
245            robot_guidance: Vec::new(),
246            event_reader,
247            diagnostics,
248            loop_context: Some(context),
249            skill_registry,
250            robot_service: None,
251        }
252    }
253
254    /// Creates a new event loop with explicit diagnostics collector (for testing).
255    pub fn with_diagnostics(
256        config: RalphConfig,
257        diagnostics: crate::diagnostics::DiagnosticsCollector,
258    ) -> Self {
259        let registry = HatRegistry::from_config(&config);
260        let instruction_builder =
261            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
262
263        let mut bus = EventBus::new();
264
265        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
266        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
267        // Custom hats are registered first (higher priority), Ralph catches everything else.
268        for hat in registry.all() {
269            bus.register(hat.clone());
270        }
271
272        // Always register Ralph as catch-all coordinator
273        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
274        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
275        bus.register(ralph_hat);
276
277        if registry.is_empty() {
278            debug!("Solo mode: Ralph is the only coordinator");
279        } else {
280            debug!(
281                "Multi-hat mode: {} custom hats + Ralph as fallback",
282                registry.len()
283            );
284        }
285
286        // Build skill registry from config
287        let workspace_root = std::path::Path::new(".");
288        let skill_registry = if config.skills.enabled {
289            SkillRegistry::from_config(
290                &config.skills,
291                workspace_root,
292                Some(config.cli.backend.as_str()),
293            )
294            .unwrap_or_else(|e| {
295                warn!(
296                    "Failed to build skill registry: {}, using empty registry",
297                    e
298                );
299                SkillRegistry::new(Some(config.cli.backend.as_str()))
300            })
301        } else {
302            SkillRegistry::new(Some(config.cli.backend.as_str()))
303        };
304
305        let skill_index = if config.skills.enabled {
306            skill_registry.build_index(None)
307        } else {
308            String::new()
309        };
310
311        // When memories are enabled, add tasks CLI instructions alongside scratchpad
312        let ralph = HatlessRalph::new(
313            config.event_loop.completion_promise.clone(),
314            config.core.clone(),
315            &registry,
316            config.event_loop.starting_event.clone(),
317        )
318        .with_memories_enabled(config.memories.enabled)
319        .with_skill_index(skill_index);
320
321        // Read events path from marker file, fall back to default if not present
322        // The marker file is written by run_loop_impl() at run startup
323        let events_path = std::fs::read_to_string(".ralph/current-events")
324            .map(|s| s.trim().to_string())
325            .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
326        let event_reader = EventReader::new(&events_path);
327
328        Self {
329            config,
330            registry,
331            bus,
332            state: LoopState::new(),
333            instruction_builder,
334            ralph,
335            robot_guidance: Vec::new(),
336            event_reader,
337            diagnostics,
338            loop_context: None,
339            skill_registry,
340            robot_service: None,
341        }
342    }
343
344    /// Injects a robot service for human-in-the-loop communication.
345    ///
346    /// Call this after construction to enable `human.interact` event handling,
347    /// periodic check-ins, and question/response flow. The service is typically
348    /// created by the CLI layer (e.g., `TelegramService`) and injected here,
349    /// keeping the core event loop decoupled from any specific communication
350    /// platform.
351    pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
352        self.robot_service = Some(service);
353    }
354
355    /// Returns the loop context, if one was provided.
356    pub fn loop_context(&self) -> Option<&LoopContext> {
357        self.loop_context.as_ref()
358    }
359
360    /// Returns the tasks path based on loop context or default.
361    fn tasks_path(&self) -> PathBuf {
362        self.loop_context
363            .as_ref()
364            .map(|ctx| ctx.tasks_path())
365            .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
366    }
367
368    /// Returns the scratchpad path based on loop context or config.
369    fn scratchpad_path(&self) -> PathBuf {
370        self.loop_context
371            .as_ref()
372            .map(|ctx| ctx.scratchpad_path())
373            .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
374    }
375
376    /// Returns the current loop state.
377    pub fn state(&self) -> &LoopState {
378        &self.state
379    }
380
381    /// Returns the configuration.
382    pub fn config(&self) -> &RalphConfig {
383        &self.config
384    }
385
386    /// Returns the hat registry.
387    pub fn registry(&self) -> &HatRegistry {
388        &self.registry
389    }
390
391    /// Gets the backend configuration for a hat.
392    ///
393    /// If the hat has a backend configured, returns that.
394    /// Otherwise, returns None (caller should use global backend).
395    pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
396        self.registry
397            .get_config(hat_id)
398            .and_then(|config| config.backend.as_ref())
399    }
400
401    /// Adds an observer that receives all published events.
402    ///
403    /// Multiple observers can be added (e.g., session recorder + TUI).
404    /// Each observer is called before events are routed to subscribers.
405    pub fn add_observer<F>(&mut self, observer: F)
406    where
407        F: Fn(&Event) + Send + 'static,
408    {
409        self.bus.add_observer(observer);
410    }
411
412    /// Sets a single observer, clearing any existing observers.
413    ///
414    /// Prefer `add_observer` when multiple observers are needed.
415    #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
416    pub fn set_observer<F>(&mut self, observer: F)
417    where
418        F: Fn(&Event) + Send + 'static,
419    {
420        #[allow(deprecated)]
421        self.bus.set_observer(observer);
422    }
423
424    /// Checks if any termination condition is met.
425    pub fn check_termination(&self) -> Option<TerminationReason> {
426        let cfg = &self.config.event_loop;
427
428        if self.state.iteration >= cfg.max_iterations {
429            return Some(TerminationReason::MaxIterations);
430        }
431
432        if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
433            return Some(TerminationReason::MaxRuntime);
434        }
435
436        if let Some(max_cost) = cfg.max_cost_usd
437            && self.state.cumulative_cost >= max_cost
438        {
439            return Some(TerminationReason::MaxCost);
440        }
441
442        if self.state.consecutive_failures >= cfg.max_consecutive_failures {
443            return Some(TerminationReason::ConsecutiveFailures);
444        }
445
446        // Check for loop thrashing: planner keeps dispatching abandoned tasks
447        if self.state.abandoned_task_redispatches >= 3 {
448            return Some(TerminationReason::LoopThrashing);
449        }
450
451        // Check for validation failures: too many consecutive malformed JSONL lines
452        if self.state.consecutive_malformed_events >= 3 {
453            return Some(TerminationReason::ValidationFailure);
454        }
455
456        // Check for stop signal from Telegram /stop or CLI stop-requested
457        let stop_path =
458            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
459        if stop_path.exists() {
460            let _ = std::fs::remove_file(&stop_path);
461            return Some(TerminationReason::Stopped);
462        }
463
464        // Check for restart signal from Telegram /restart command
465        let restart_path =
466            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
467        if restart_path.exists() {
468            return Some(TerminationReason::RestartRequested);
469        }
470
471        None
472    }
473
474    /// Checks if a completion event was received and returns termination reason.
475    ///
476    /// Completion is only accepted via JSONL events (e.g., `ralph emit`).
477    pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
478        if !self.state.completion_requested {
479            return None;
480        }
481
482        self.state.completion_requested = false;
483
484        // In persistent mode, suppress completion and keep the loop alive
485        if self.config.event_loop.persistent {
486            info!("Completion event suppressed - persistent mode active, loop staying alive");
487
488            self.diagnostics.log_orchestration(
489                self.state.iteration,
490                "loop",
491                crate::diagnostics::OrchestrationEvent::LoopTerminated {
492                    reason: "completion_event_suppressed_persistent".to_string(),
493                },
494            );
495
496            // Inject a task.resume event so the loop continues with an idle prompt
497            let resume_event = Event::new(
498                "task.resume",
499                "Persistent mode: loop staying alive after completion signal. \
500                 Check for new tasks or await human guidance.",
501            );
502            self.bus.publish(resume_event);
503
504            return None;
505        }
506
507        // Log warning if tasks remain open (informational only)
508        if self.config.memories.enabled {
509            if let Ok(false) = self.verify_tasks_complete() {
510                let open_tasks = self.get_open_task_list();
511                warn!(
512                    open_tasks = ?open_tasks,
513                    "Completion event with {} open task(s) - trusting agent decision",
514                    open_tasks.len()
515                );
516            }
517        } else if let Ok(false) = self.verify_scratchpad_complete() {
518            warn!("Completion event with pending scratchpad tasks - trusting agent decision");
519        }
520
521        info!("Completion event detected - terminating");
522
523        // Log loop terminated
524        self.diagnostics.log_orchestration(
525            self.state.iteration,
526            "loop",
527            crate::diagnostics::OrchestrationEvent::LoopTerminated {
528                reason: "completion_event".to_string(),
529            },
530        );
531
532        Some(TerminationReason::CompletionPromise)
533    }
534
535    /// Initializes the loop by publishing the start event.
536    pub fn initialize(&mut self, prompt_content: &str) {
537        // Use configured starting_event or default to task.start for backward compatibility
538        let topic = self
539            .config
540            .event_loop
541            .starting_event
542            .clone()
543            .unwrap_or_else(|| "task.start".to_string());
544        self.initialize_with_topic(&topic, prompt_content);
545    }
546
547    /// Initializes the loop for resume mode by publishing task.resume.
548    ///
549    /// Per spec: "User can run `ralph resume` to restart reading existing scratchpad."
550    /// The planner should read the existing scratchpad rather than doing fresh gap analysis.
551    pub fn initialize_resume(&mut self, prompt_content: &str) {
552        // Resume always uses task.resume regardless of starting_event config
553        self.initialize_with_topic("task.resume", prompt_content);
554    }
555
556    /// Common initialization logic with configurable topic.
557    fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
558        // Store the objective so it persists across all iterations.
559        // After iteration 1, bus.take_pending() consumes the start event,
560        // so without this the objective would be invisible to later hats.
561        self.ralph.set_objective(prompt_content.to_string());
562
563        let start_event = Event::new(topic, prompt_content);
564        self.bus.publish(start_event);
565        debug!(topic = topic, "Published {} event", topic);
566    }
567
568    /// Gets the next hat to execute (if any have pending events).
569    ///
570    /// Per "Hatless Ralph" architecture: When custom hats are defined, Ralph is
571    /// always the executor. Custom hats define topology (pub/sub contracts) that
572    /// Ralph uses for coordination context, but Ralph handles all iterations.
573    ///
574    /// - Solo mode (no custom hats): Returns "ralph" if Ralph has pending events
575    /// - Multi-hat mode (custom hats defined): Always returns "ralph" if ANY hat has pending events
576    pub fn next_hat(&self) -> Option<&HatId> {
577        let next = self.bus.next_hat_with_pending();
578
579        // If no pending hat events but human interactions are pending, route to Ralph.
580        if next.is_none() && self.bus.has_human_pending() {
581            return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
582        }
583
584        // If no pending events, return None
585        next.as_ref()?;
586
587        // In multi-hat mode, always route to Ralph (custom hats define topology only)
588        // Ralph's prompt includes the ## HATS section for coordination awareness
589        if self.registry.is_empty() {
590            // Solo mode - return the next hat (which is "ralph")
591            next
592        } else {
593            // Return "ralph" - the constant coordinator
594            // Find ralph in the bus's registered hats
595            self.bus.hat_ids().find(|id| id.as_str() == "ralph")
596        }
597    }
598
599    /// Checks if any hats have pending events.
600    ///
601    /// Use this after `process_output` to detect if the LLM failed to publish an event.
602    /// If false after processing, the loop will terminate on the next iteration.
603    pub fn has_pending_events(&self) -> bool {
604        self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
605    }
606
607    /// Checks if any pending events are human-related (human.response, human.guidance).
608    ///
609    /// Used to skip cooldown delays when a human event is next, since we don't
610    /// want to artificially delay the response to a human interaction.
611    pub fn has_pending_human_events(&self) -> bool {
612        self.bus.has_human_pending()
613    }
614
615    /// Gets the topics a hat is allowed to publish.
616    ///
617    /// Used to build retry prompts when the LLM forgets to publish an event.
618    pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
619        self.registry
620            .get(hat_id)
621            .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
622            .unwrap_or_default()
623    }
624
625    /// Injects a fallback event to recover from a stalled loop.
626    ///
627    /// When no hats have pending events (agent failed to publish), this method
628    /// injects a `task.resume` event which Ralph will handle to attempt recovery.
629    ///
630    /// Returns true if a fallback event was injected, false if recovery is not possible.
631    pub fn inject_fallback_event(&mut self) -> bool {
632        let fallback_event = Event::new(
633            "task.resume",
634            "RECOVERY: Previous iteration did not publish an event. \
635             Review the scratchpad and either dispatch the next task or complete the loop.",
636        );
637
638        // If a custom hat was last executing, target the fallback back to it
639        // This preserves hat context instead of always falling back to Ralph
640        let fallback_event = match &self.state.last_hat {
641            Some(hat_id) if hat_id.as_str() != "ralph" => {
642                debug!(
643                    hat = %hat_id.as_str(),
644                    "Injecting fallback event to recover - targeting last hat with task.resume"
645                );
646                fallback_event.with_target(hat_id.clone())
647            }
648            _ => {
649                debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
650                fallback_event
651            }
652        };
653
654        self.bus.publish(fallback_event);
655        true
656    }
657
658    /// Builds the prompt for a hat's execution.
659    ///
660    /// Per "Hatless Ralph" architecture:
661    /// - Solo mode: Ralph handles everything with his own prompt
662    /// - Multi-hat mode: Ralph is the sole executor, custom hats define topology only
663    ///
664    /// When in multi-hat mode, this method collects ALL pending events across all hats
665    /// and builds Ralph's prompt with that context. The `## HATS` section in Ralph's
666    /// prompt documents the topology for coordination awareness.
667    ///
668    /// If memories are configured with `inject: auto`, this method also prepends
669    /// primed memories to the prompt context. If a scratchpad file exists and is
670    /// non-empty, its content is also prepended (before memories).
671    pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
672        // Handle "ralph" hat - the constant coordinator
673        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
674        if hat_id.as_str() == "ralph" {
675            if self.registry.is_empty() {
676                // Solo mode - just Ralph's events, no hats to filter
677                let mut events = self.bus.take_pending(&hat_id.clone());
678                let mut human_events = self.bus.take_human_pending();
679                events.append(&mut human_events);
680
681                // Separate human.guidance events from regular events
682                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
683                    .into_iter()
684                    .partition(|e| e.topic.as_str() == "human.guidance");
685
686                let events_context = regular_events
687                    .iter()
688                    .map(|e| Self::format_event(e))
689                    .collect::<Vec<_>>()
690                    .join("\n");
691
692                // Persist and inject human guidance into prompt if present
693                self.update_robot_guidance(guidance_events);
694                self.apply_robot_guidance();
695
696                // Build base prompt and prepend memories + scratchpad + ready tasks
697                let base_prompt = self.ralph.build_prompt(&events_context, &[]);
698                self.ralph.clear_robot_guidance();
699                let with_skills = self.prepend_auto_inject_skills(base_prompt);
700                let with_scratchpad = self.prepend_scratchpad(with_skills);
701                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
702
703                debug!("build_prompt: routing to HatlessRalph (solo mode)");
704                return Some(final_prompt);
705            } else {
706                // Multi-hat mode: collect events and determine active hats
707                let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
708                // Deterministic ordering (avoid HashMap iteration order nondeterminism).
709                all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
710
711                let mut all_events = Vec::new();
712                let mut system_events = Vec::new();
713
714                for id in &all_hat_ids {
715                    let pending = self.bus.take_pending(id);
716                    if pending.is_empty() {
717                        continue;
718                    }
719
720                    let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
721                    if drop_pending {
722                        // Drop the pending events that would have activated the hat.
723                        if let Some(exhausted_event) = exhausted_event {
724                            all_events.push(exhausted_event.clone());
725                            system_events.push(exhausted_event);
726                        }
727                        continue;
728                    }
729
730                    all_events.extend(pending);
731                }
732
733                let mut human_events = self.bus.take_human_pending();
734                all_events.append(&mut human_events);
735
736                // Publish orchestrator-generated system events after consuming pending events,
737                // so they become visible in the event log and can be handled next iteration.
738                for event in system_events {
739                    self.bus.publish(event);
740                }
741
742                // Separate human.guidance events from regular events
743                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
744                    .into_iter()
745                    .partition(|e| e.topic.as_str() == "human.guidance");
746
747                // Persist and inject human guidance before building prompt (must happen before
748                // immutable borrows from determine_active_hats)
749                self.update_robot_guidance(guidance_events);
750                self.apply_robot_guidance();
751
752                // Determine which hats are active based on regular events
753                let active_hat_ids = self.determine_active_hat_ids(&regular_events);
754                self.record_hat_activations(&active_hat_ids);
755                let active_hats = self.determine_active_hats(&regular_events);
756
757                // Format events for context
758                let events_context = regular_events
759                    .iter()
760                    .map(|e| Self::format_event(e))
761                    .collect::<Vec<_>>()
762                    .join("\n");
763
764                // Build base prompt and prepend memories + scratchpad if available
765                let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
766
767                // Build prompt with active hats - filters instructions to only active hats
768                debug!(
769                    "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
770                    active_hats
771                        .iter()
772                        .map(|h| h.id.as_str())
773                        .collect::<Vec<_>>()
774                );
775
776                // Clear guidance after active_hats references are no longer needed
777                self.ralph.clear_robot_guidance();
778                let with_skills = self.prepend_auto_inject_skills(base_prompt);
779                let with_scratchpad = self.prepend_scratchpad(with_skills);
780                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
781
782                return Some(final_prompt);
783            }
784        }
785
786        // Non-ralph hat requested - this shouldn't happen in multi-hat mode since
787        // next_hat() always returns "ralph" when custom hats are defined.
788        // But we keep this code path for backward compatibility and tests.
789        let events = self.bus.take_pending(&hat_id.clone());
790        let events_context = events
791            .iter()
792            .map(|e| Self::format_event(e))
793            .collect::<Vec<_>>()
794            .join("\n");
795
796        let hat = self.registry.get(hat_id)?;
797
798        // Debug logging to trace hat routing
799        debug!(
800            "build_prompt: hat_id='{}', instructions.is_empty()={}",
801            hat_id.as_str(),
802            hat.instructions.is_empty()
803        );
804
805        // All hats use build_custom_hat with ghuntley-style prompts
806        debug!(
807            "build_prompt: routing to build_custom_hat() for '{}'",
808            hat_id.as_str()
809        );
810        Some(
811            self.instruction_builder
812                .build_custom_hat(hat, &events_context),
813        )
814    }
815
816    /// Stores guidance payloads, persists them to scratchpad, and prepares them for prompt injection.
817    ///
818    /// Guidance events are ephemeral in the event bus (consumed by `take_pending`).
819    /// This method both caches them in memory for prompt injection and appends
820    /// them to the scratchpad file so they survive across process restarts.
821    fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
822        if guidance_events.is_empty() {
823            return;
824        }
825
826        // Persist new guidance to scratchpad before caching
827        self.persist_guidance_to_scratchpad(&guidance_events);
828
829        self.robot_guidance
830            .extend(guidance_events.into_iter().map(|e| e.payload));
831    }
832
833    /// Appends human guidance entries to the scratchpad file for durability.
834    ///
835    /// Each guidance message is written as a timestamped markdown entry so it
836    /// appears alongside the agent's own thinking and survives process restarts.
837    fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
838        use std::io::Write;
839
840        let scratchpad_path = self.scratchpad_path();
841        let resolved_path = if scratchpad_path.is_relative() {
842            self.config.core.workspace_root.join(&scratchpad_path)
843        } else {
844            scratchpad_path
845        };
846
847        // Create parent directories if needed
848        if let Some(parent) = resolved_path.parent()
849            && !parent.exists()
850            && let Err(e) = std::fs::create_dir_all(parent)
851        {
852            warn!("Failed to create scratchpad directory: {}", e);
853            return;
854        }
855
856        let mut file = match std::fs::OpenOptions::new()
857            .create(true)
858            .append(true)
859            .open(&resolved_path)
860        {
861            Ok(f) => f,
862            Err(e) => {
863                warn!("Failed to open scratchpad for guidance persistence: {}", e);
864                return;
865            }
866        };
867
868        let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
869        for event in guidance_events {
870            let entry = format!(
871                "\n### HUMAN GUIDANCE ({})\n\n{}\n",
872                timestamp, event.payload
873            );
874            if let Err(e) = file.write_all(entry.as_bytes()) {
875                warn!("Failed to write guidance to scratchpad: {}", e);
876            }
877        }
878
879        info!(
880            count = guidance_events.len(),
881            "Persisted human guidance to scratchpad"
882        );
883    }
884
885    /// Injects cached guidance into the next prompt build.
886    fn apply_robot_guidance(&mut self) {
887        if self.robot_guidance.is_empty() {
888            return;
889        }
890
891        self.ralph.set_robot_guidance(self.robot_guidance.clone());
892    }
893
894    /// Prepends auto-injected skill content to the prompt.
895    ///
896    /// This generalizes the former `prepend_memories()` into a skill auto-injection
897    /// pipeline that handles memories, tools, and any other auto-inject skills.
898    ///
899    /// Injection order:
900    /// 1. Memory data + ralph-tools skill (special case: loads memory data from store, applies budget)
901    /// 2. RObot interaction skill (gated by `robot.enabled`)
902    /// 3. Other auto-inject skills from the registry (wrapped in XML tags)
903    fn prepend_auto_inject_skills(&self, prompt: String) -> String {
904        let mut prefix = String::new();
905
906        // 1. Memory data + ralph-tools skill — special case with data loading
907        self.inject_memories_and_tools_skill(&mut prefix);
908
909        // 2. RObot interaction skill — gated by robot.enabled
910        self.inject_robot_skill(&mut prefix);
911
912        // 3. Other auto-inject skills from the registry
913        self.inject_custom_auto_skills(&mut prefix);
914
915        if prefix.is_empty() {
916            return prompt;
917        }
918
919        prefix.push_str("\n\n");
920        prefix.push_str(&prompt);
921        prefix
922    }
923
924    /// Injects memory data and the ralph-tools skill into the prefix.
925    ///
926    /// Special case: loads memory entries from the store, applies budget
927    /// truncation, then appends the ralph-tools skill content (which covers
928    /// both tasks and memories CLI usage).
929    /// Memory data is gated by `memories.enabled && memories.inject == Auto`.
930    /// The ralph-tools skill is injected when either memories or tasks are enabled.
931    fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
932        let memories_config = &self.config.memories;
933
934        // Inject memory DATA if memories are enabled with auto-inject
935        if memories_config.enabled && memories_config.inject == InjectMode::Auto {
936            info!(
937                "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
938                memories_config.enabled, memories_config.inject, self.config.core.workspace_root
939            );
940
941            let workspace_root = &self.config.core.workspace_root;
942            let store = MarkdownMemoryStore::with_default_path(workspace_root);
943            let memories_path = workspace_root.join(".ralph/agent/memories.md");
944
945            info!(
946                "Looking for memories at: {:?} (exists: {})",
947                memories_path,
948                memories_path.exists()
949            );
950
951            let memories = match store.load() {
952                Ok(memories) => {
953                    info!("Successfully loaded {} memories from store", memories.len());
954                    memories
955                }
956                Err(e) => {
957                    info!(
958                        "Failed to load memories for injection: {} (path: {:?})",
959                        e, memories_path
960                    );
961                    Vec::new()
962                }
963            };
964
965            if memories.is_empty() {
966                info!("Memory store is empty - no memories to inject");
967            } else {
968                let mut memories_content = format_memories_as_markdown(&memories);
969
970                if memories_config.budget > 0 {
971                    let original_len = memories_content.len();
972                    memories_content =
973                        truncate_to_budget(&memories_content, memories_config.budget);
974                    debug!(
975                        "Applied budget: {} chars -> {} chars (budget: {})",
976                        original_len,
977                        memories_content.len(),
978                        memories_config.budget
979                    );
980                }
981
982                info!(
983                    "Injecting {} memories ({} chars) into prompt",
984                    memories.len(),
985                    memories_content.len()
986                );
987
988                prefix.push_str(&memories_content);
989            }
990        }
991
992        // Inject the ralph-tools skill when either memories or tasks are enabled
993        if memories_config.enabled || self.config.tasks.enabled {
994            if let Some(skill) = self.skill_registry.get("ralph-tools") {
995                if !prefix.is_empty() {
996                    prefix.push_str("\n\n");
997                }
998                prefix.push_str(&format!(
999                    "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1000                    skill.content.trim()
1001                ));
1002                debug!("Injected ralph-tools skill from registry");
1003            } else {
1004                debug!("ralph-tools skill not found in registry - skill content not injected");
1005            }
1006        }
1007    }
1008
1009    /// Injects the RObot interaction skill content into the prefix.
1010    ///
1011    /// Gated by `robot.enabled`. Teaches agents how and when to interact
1012    /// with humans via `human.interact` events.
1013    fn inject_robot_skill(&self, prefix: &mut String) {
1014        if !self.config.robot.enabled {
1015            return;
1016        }
1017
1018        if let Some(skill) = self.skill_registry.get("robot-interaction") {
1019            if !prefix.is_empty() {
1020                prefix.push_str("\n\n");
1021            }
1022            prefix.push_str(&format!(
1023                "<robot-skill>\n{}\n</robot-skill>",
1024                skill.content.trim()
1025            ));
1026            debug!("Injected robot interaction skill from registry");
1027        }
1028    }
1029
1030    /// Injects any user-configured auto-inject skills (excluding built-in ralph-tools/robot-interaction).
1031    fn inject_custom_auto_skills(&self, prefix: &mut String) {
1032        for skill in self.skill_registry.auto_inject_skills(None) {
1033            // Skip built-in skills handled above
1034            if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1035                continue;
1036            }
1037
1038            if !prefix.is_empty() {
1039                prefix.push_str("\n\n");
1040            }
1041            prefix.push_str(&format!(
1042                "<{name}-skill>\n{content}\n</{name}-skill>",
1043                name = skill.name,
1044                content = skill.content.trim()
1045            ));
1046            debug!("Injected auto-inject skill: {}", skill.name);
1047        }
1048    }
1049
1050    /// Prepends scratchpad content to the prompt if the file exists and is non-empty.
1051    ///
1052    /// The scratchpad is the agent's working memory for the current objective.
1053    /// Auto-injecting saves one tool call per iteration.
1054    /// When the file exceeds the budget, the TAIL is kept (most recent entries).
1055    fn prepend_scratchpad(&self, prompt: String) -> String {
1056        let scratchpad_path = self.scratchpad_path();
1057
1058        let resolved_path = if scratchpad_path.is_relative() {
1059            self.config.core.workspace_root.join(&scratchpad_path)
1060        } else {
1061            scratchpad_path
1062        };
1063
1064        if !resolved_path.exists() {
1065            debug!(
1066                "Scratchpad not found at {:?}, skipping injection",
1067                resolved_path
1068            );
1069            return prompt;
1070        }
1071
1072        let content = match std::fs::read_to_string(&resolved_path) {
1073            Ok(c) => c,
1074            Err(e) => {
1075                info!("Failed to read scratchpad for injection: {}", e);
1076                return prompt;
1077            }
1078        };
1079
1080        if content.trim().is_empty() {
1081            debug!("Scratchpad is empty, skipping injection");
1082            return prompt;
1083        }
1084
1085        // Budget: 4000 tokens ~16000 chars. Keep the TAIL (most recent content).
1086        let char_budget = 4000 * 4;
1087        let content = if content.len() > char_budget {
1088            // Find a line boundary near the start of the tail
1089            let start = content.len() - char_budget;
1090            // Ensure we start at a valid UTF-8 character boundary
1091            let start = floor_char_boundary(&content, start);
1092            let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1093            let discarded = &content[..line_start];
1094
1095            // Summarize discarded content by extracting markdown headings
1096            let headings: Vec<&str> = discarded
1097                .lines()
1098                .filter(|line| line.starts_with('#'))
1099                .collect();
1100            let summary = if headings.is_empty() {
1101                format!(
1102                    "<!-- earlier content truncated ({} chars omitted) -->",
1103                    line_start
1104                )
1105            } else {
1106                format!(
1107                    "<!-- earlier content truncated ({} chars omitted) -->\n\
1108                     <!-- discarded sections: {} -->",
1109                    line_start,
1110                    headings.join(" | ")
1111                )
1112            };
1113
1114            format!("{}\n\n{}", summary, &content[line_start..])
1115        } else {
1116            content
1117        };
1118
1119        info!("Injecting scratchpad ({} chars) into prompt", content.len());
1120
1121        let mut final_prompt = format!(
1122            "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1123            self.config.core.scratchpad, content
1124        );
1125        final_prompt.push_str(&prompt);
1126        final_prompt
1127    }
1128
1129    /// Prepends ready tasks to the prompt if tasks are enabled and any exist.
1130    ///
1131    /// Loads the task store and formats ready (unblocked, open) tasks into
1132    /// a `<ready-tasks>` XML block. This saves the agent a tool call per
1133    /// iteration and puts tasks at the same prominence as the scratchpad.
1134    fn prepend_ready_tasks(&self, prompt: String) -> String {
1135        if !self.config.tasks.enabled {
1136            return prompt;
1137        }
1138
1139        use crate::task::TaskStatus;
1140        use crate::task_store::TaskStore;
1141
1142        let tasks_path = self.tasks_path();
1143        let resolved_path = if tasks_path.is_relative() {
1144            self.config.core.workspace_root.join(&tasks_path)
1145        } else {
1146            tasks_path
1147        };
1148
1149        if !resolved_path.exists() {
1150            return prompt;
1151        }
1152
1153        let store = match TaskStore::load(&resolved_path) {
1154            Ok(s) => s,
1155            Err(e) => {
1156                info!("Failed to load task store for injection: {}", e);
1157                return prompt;
1158            }
1159        };
1160
1161        let ready = store.ready();
1162        let open = store.open();
1163        let closed_count = store.all().len() - open.len();
1164
1165        if open.is_empty() && closed_count == 0 {
1166            return prompt;
1167        }
1168
1169        let mut section = String::from("<ready-tasks>\n");
1170        if ready.is_empty() && open.is_empty() {
1171            section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1172        } else {
1173            section.push_str(&format!(
1174                "## Tasks: {} ready, {} open, {} closed\n\n",
1175                ready.len(),
1176                open.len(),
1177                closed_count
1178            ));
1179            for task in &ready {
1180                let status_icon = match task.status {
1181                    TaskStatus::Open => "[ ]",
1182                    TaskStatus::InProgress => "[~]",
1183                    _ => "[?]",
1184                };
1185                section.push_str(&format!(
1186                    "- {} [P{}] {} ({})\n",
1187                    status_icon, task.priority, task.title, task.id
1188                ));
1189            }
1190            // Show blocked tasks separately so agent knows they exist
1191            let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1192            let blocked: Vec<_> = open
1193                .iter()
1194                .filter(|t| !ready_ids.contains(&t.id.as_str()))
1195                .collect();
1196            if !blocked.is_empty() {
1197                section.push_str("\nBlocked:\n");
1198                for task in blocked {
1199                    section.push_str(&format!(
1200                        "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1201                        task.priority,
1202                        task.title,
1203                        task.id,
1204                        task.blocked_by.join(", ")
1205                    ));
1206                }
1207            }
1208        }
1209        section.push_str("</ready-tasks>\n\n");
1210
1211        info!(
1212            "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1213            ready.len(),
1214            open.len(),
1215            closed_count
1216        );
1217
1218        let mut final_prompt = section;
1219        final_prompt.push_str(&prompt);
1220        final_prompt
1221    }
1222
1223    /// Builds the Ralph prompt (coordination mode).
1224    pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1225        self.ralph.build_prompt(prompt_content, &[])
1226    }
1227
1228    /// Determines which hats should be active based on pending events.
1229    /// Returns list of Hat references that are triggered by any pending event.
1230    fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1231        let mut active_hats = Vec::new();
1232        for id in self.determine_active_hat_ids(events) {
1233            if let Some(hat) = self.registry.get(&id) {
1234                active_hats.push(hat);
1235            }
1236        }
1237        active_hats
1238    }
1239
1240    fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1241        let mut active_hat_ids = Vec::new();
1242        for event in events {
1243            if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1244                // Avoid duplicates
1245                if !active_hat_ids.iter().any(|id| id == &hat.id) {
1246                    active_hat_ids.push(hat.id.clone());
1247                }
1248            }
1249        }
1250        active_hat_ids
1251    }
1252
1253    /// Formats an event for prompt context.
1254    ///
1255    /// For top-level prompts (task.start, task.resume), wraps the payload in
1256    /// `<top-level-prompt>` XML tags to clearly delineate the user's original request.
1257    fn format_event(event: &Event) -> String {
1258        let topic = &event.topic;
1259        let payload = &event.payload;
1260
1261        if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1262            format!(
1263                "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1264                topic, payload
1265            )
1266        } else {
1267            format!("Event: {} - {}", topic, payload)
1268        }
1269    }
1270
1271    fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1272        let Some(config) = self.registry.get_config(hat_id) else {
1273            return (false, None);
1274        };
1275        let Some(max) = config.max_activations else {
1276            return (false, None);
1277        };
1278
1279        let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1280        if count < max {
1281            return (false, None);
1282        }
1283
1284        // Emit only once per hat per run (avoid flooding).
1285        let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1286
1287        if !should_emit {
1288            // Hat is already exhausted - drop pending events silently.
1289            return (true, None);
1290        }
1291
1292        let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1293        dropped_topics.sort();
1294
1295        let payload = format!(
1296            "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n  - {topics}",
1297            hat = hat_id.as_str(),
1298            max = max,
1299            count = count,
1300            topics = dropped_topics.join("\n  - ")
1301        );
1302
1303        warn!(
1304            hat = %hat_id.as_str(),
1305            max_activations = max,
1306            activations = count,
1307            "Hat exhausted (max_activations reached)"
1308        );
1309
1310        (
1311            true,
1312            Some(Event::new(
1313                format!("{}.exhausted", hat_id.as_str()),
1314                payload,
1315            )),
1316        )
1317    }
1318
1319    fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1320        for hat_id in active_hat_ids {
1321            *self
1322                .state
1323                .hat_activation_counts
1324                .entry(hat_id.clone())
1325                .or_insert(0) += 1;
1326        }
1327    }
1328
1329    /// Returns the primary active hat ID for display purposes.
1330    /// Returns the first active hat, or "ralph" if no specific hat is active.
1331    pub fn get_active_hat_id(&self) -> HatId {
1332        // Peek at pending events (don't consume them)
1333        for hat_id in self.bus.hat_ids() {
1334            let Some(events) = self.bus.peek_pending(hat_id) else {
1335                continue;
1336            };
1337            let Some(event) = events.first() else {
1338                continue;
1339            };
1340            if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1341                return active_hat.id.clone();
1342            }
1343        }
1344        HatId::new("ralph")
1345    }
1346
1347    /// Records the current event count before hat execution.
1348    ///
1349    /// Call this before executing a hat, then use `check_default_publishes`
1350    /// after execution to inject a fallback event if needed.
1351    pub fn record_event_count(&mut self) -> usize {
1352        self.event_reader
1353            .read_new_events()
1354            .map(|r| r.events.len())
1355            .unwrap_or(0)
1356    }
1357
1358    /// Checks if hat wrote any events, and injects default if configured.
1359    ///
1360    /// Call this after hat execution with the count from `record_event_count`.
1361    /// If no new events were written AND the hat has `default_publishes` configured,
1362    /// this will inject the default event automatically.
1363    pub fn check_default_publishes(&mut self, hat_id: &HatId, _events_before: usize) {
1364        let events_after = self
1365            .event_reader
1366            .read_new_events()
1367            .map(|r| r.events.len())
1368            .unwrap_or(0);
1369
1370        if events_after == 0
1371            && let Some(config) = self.registry.get_config(hat_id)
1372            && let Some(default_topic) = &config.default_publishes
1373        {
1374            // No new events written - inject default event
1375            let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1376
1377            debug!(
1378                hat = %hat_id.as_str(),
1379                topic = %default_topic,
1380                "No events written by hat, injecting default_publishes event"
1381            );
1382
1383            self.bus.publish(default_event);
1384        }
1385    }
1386
1387    /// Returns a mutable reference to the event bus for direct event publishing.
1388    ///
1389    /// This is primarily used for planning sessions to inject user responses
1390    /// as events into the orchestration loop.
1391    pub fn bus(&mut self) -> &mut EventBus {
1392        &mut self.bus
1393    }
1394
1395    /// Processes output from a hat execution.
1396    ///
1397    /// Returns the termination reason if the loop should stop.
1398    pub fn process_output(
1399        &mut self,
1400        hat_id: &HatId,
1401        output: &str,
1402        success: bool,
1403    ) -> Option<TerminationReason> {
1404        self.state.iteration += 1;
1405        self.state.last_hat = Some(hat_id.clone());
1406
1407        // Periodic robot check-in
1408        if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1409            && let Some(ref robot_service) = self.robot_service
1410        {
1411            let elapsed = self.state.elapsed();
1412            let interval = std::time::Duration::from_secs(interval_secs);
1413            let last = self
1414                .state
1415                .last_checkin_at
1416                .map(|t| t.elapsed())
1417                .unwrap_or(elapsed);
1418
1419            if last >= interval {
1420                let context = self.build_checkin_context(hat_id);
1421                match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1422                    Ok(_) => {
1423                        self.state.last_checkin_at = Some(std::time::Instant::now());
1424                        debug!(iteration = self.state.iteration, "Sent robot check-in");
1425                    }
1426                    Err(e) => {
1427                        warn!(error = %e, "Failed to send robot check-in");
1428                    }
1429                }
1430            }
1431        }
1432
1433        // Log iteration started
1434        self.diagnostics.log_orchestration(
1435            self.state.iteration,
1436            "loop",
1437            crate::diagnostics::OrchestrationEvent::IterationStarted,
1438        );
1439
1440        // Log hat selected
1441        self.diagnostics.log_orchestration(
1442            self.state.iteration,
1443            "loop",
1444            crate::diagnostics::OrchestrationEvent::HatSelected {
1445                hat: hat_id.to_string(),
1446                reason: "process_output".to_string(),
1447            },
1448        );
1449
1450        // Track failures
1451        if success {
1452            self.state.consecutive_failures = 0;
1453        } else {
1454            self.state.consecutive_failures += 1;
1455        }
1456
1457        let _ = output;
1458
1459        // Events are ONLY read from the JSONL file written by `ralph emit`.
1460        // This enforces tool use and prevents confabulation (agent claiming to emit without actually doing so).
1461        // See process_events_from_jsonl() for event processing.
1462
1463        // Check termination conditions
1464        self.check_termination()
1465    }
1466
1467    /// Extracts task identifier from build.blocked payload.
1468    /// Uses first line of payload as task ID.
1469    fn extract_task_id(payload: &str) -> String {
1470        payload
1471            .lines()
1472            .next()
1473            .unwrap_or("unknown")
1474            .trim()
1475            .to_string()
1476    }
1477
1478    /// Adds cost to the cumulative total.
1479    pub fn add_cost(&mut self, cost: f64) {
1480        self.state.cumulative_cost += cost;
1481    }
1482
1483    /// Verifies all tasks in scratchpad are complete or cancelled.
1484    ///
1485    /// Returns:
1486    /// - `Ok(true)` if all tasks are `[x]` or `[~]`
1487    /// - `Ok(false)` if any tasks are `[ ]` (pending)
1488    /// - `Err(...)` if scratchpad doesn't exist or can't be read
1489    fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1490        let scratchpad_path = self.scratchpad_path();
1491
1492        if !scratchpad_path.exists() {
1493            return Err(std::io::Error::new(
1494                std::io::ErrorKind::NotFound,
1495                "Scratchpad does not exist",
1496            ));
1497        }
1498
1499        let content = std::fs::read_to_string(scratchpad_path)?;
1500
1501        let has_pending = content
1502            .lines()
1503            .any(|line| line.trim_start().starts_with("- [ ]"));
1504
1505        Ok(!has_pending)
1506    }
1507
1508    fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1509        use crate::task_store::TaskStore;
1510
1511        let tasks_path = self.tasks_path();
1512
1513        // No tasks file = no pending tasks = complete
1514        if !tasks_path.exists() {
1515            return Ok(true);
1516        }
1517
1518        let store = TaskStore::load(&tasks_path)?;
1519        Ok(!store.has_pending_tasks())
1520    }
1521
1522    /// Builds a [`CheckinContext`] with current loop state for robot check-ins.
1523    fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
1524        let (open_tasks, closed_tasks) = self.count_tasks();
1525        CheckinContext {
1526            current_hat: Some(hat_id.as_str().to_string()),
1527            open_tasks,
1528            closed_tasks,
1529            cumulative_cost: self.state.cumulative_cost,
1530        }
1531    }
1532
1533    /// Counts open and closed tasks from the task store.
1534    ///
1535    /// Returns `(open_count, closed_count)`. "Open" means non-terminal tasks,
1536    /// "closed" means tasks with `TaskStatus::Closed`.
1537    fn count_tasks(&self) -> (usize, usize) {
1538        use crate::task::TaskStatus;
1539        use crate::task_store::TaskStore;
1540
1541        let tasks_path = self.tasks_path();
1542        if !tasks_path.exists() {
1543            return (0, 0);
1544        }
1545
1546        match TaskStore::load(&tasks_path) {
1547            Ok(store) => {
1548                let total = store.all().len();
1549                let open = store.open().len();
1550                let closed = total - open;
1551                // Verify: closed should match Closed status count
1552                debug_assert_eq!(
1553                    closed,
1554                    store
1555                        .all()
1556                        .iter()
1557                        .filter(|t| t.status == TaskStatus::Closed)
1558                        .count()
1559                );
1560                (open, closed)
1561            }
1562            Err(_) => (0, 0),
1563        }
1564    }
1565
1566    /// Returns a list of open task descriptions for logging purposes.
1567    fn get_open_task_list(&self) -> Vec<String> {
1568        use crate::task_store::TaskStore;
1569
1570        let tasks_path = self.tasks_path();
1571        if let Ok(store) = TaskStore::load(&tasks_path) {
1572            return store
1573                .open()
1574                .iter()
1575                .map(|t| format!("{}: {}", t.id, t.title))
1576                .collect();
1577        }
1578        vec![]
1579    }
1580
1581    fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
1582        let threshold = self.config.event_loop.mutation_score_warn_threshold;
1583
1584        match &evidence.mutants {
1585            Some(mutants) => {
1586                if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
1587                    warn!(
1588                        reason = %reason,
1589                        mutants_status = ?mutants.status,
1590                        mutants_score = mutants.score_percent,
1591                        mutants_threshold = threshold,
1592                        "Mutation testing warning"
1593                    );
1594                }
1595            }
1596            None => {
1597                if let Some(threshold) = threshold {
1598                    warn!(
1599                        mutants_threshold = threshold,
1600                        "Mutation testing warning: missing mutation evidence in build.done payload"
1601                    );
1602                }
1603            }
1604        }
1605    }
1606
1607    fn mutation_warning_reason(
1608        mutants: &MutationEvidence,
1609        threshold: Option<f64>,
1610    ) -> Option<String> {
1611        match mutants.status {
1612            MutationStatus::Fail => Some("mutation testing failed".to_string()),
1613            MutationStatus::Warn => Some(Self::format_mutation_message(
1614                "mutation score below threshold",
1615                mutants.score_percent,
1616            )),
1617            MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
1618            MutationStatus::Pass => {
1619                let threshold = threshold?;
1620
1621                match mutants.score_percent {
1622                    Some(score) if score < threshold => Some(format!(
1623                        "mutation score {:.2}% below threshold {:.2}%",
1624                        score, threshold
1625                    )),
1626                    Some(_) => None,
1627                    None => Some(format!(
1628                        "mutation score missing (threshold {:.2}%)",
1629                        threshold
1630                    )),
1631                }
1632            }
1633        }
1634    }
1635
1636    fn format_mutation_message(message: &str, score: Option<f64>) -> String {
1637        match score {
1638            Some(score) => format!("{message} ({score:.2}%)"),
1639            None => message.to_string(),
1640        }
1641    }
1642
1643    /// Processes events from JSONL and routes orphaned events to Ralph.
1644    ///
1645    /// Also handles backpressure for malformed JSONL lines by:
1646    /// 1. Emitting `event.malformed` system events for each parse failure
1647    /// 2. Tracking consecutive failures for termination check
1648    /// 3. Resetting counter when valid events are parsed
1649    ///
1650    /// Returns true if Ralph should be invoked to handle orphaned events.
1651    pub fn process_events_from_jsonl(&mut self) -> std::io::Result<bool> {
1652        let result = self.event_reader.read_new_events()?;
1653
1654        // Handle malformed lines with backpressure
1655        for malformed in &result.malformed {
1656            let payload = format!(
1657                "Line {}: {}\nContent: {}",
1658                malformed.line_number, malformed.error, &malformed.content
1659            );
1660            let event = Event::new("event.malformed", &payload);
1661            self.bus.publish(event);
1662            self.state.consecutive_malformed_events += 1;
1663            warn!(
1664                line = malformed.line_number,
1665                consecutive = self.state.consecutive_malformed_events,
1666                "Malformed event line detected"
1667            );
1668        }
1669
1670        // Reset counter when valid events are parsed
1671        if !result.events.is_empty() {
1672            self.state.consecutive_malformed_events = 0;
1673        }
1674
1675        if result.events.is_empty() && result.malformed.is_empty() {
1676            return Ok(false);
1677        }
1678
1679        let mut has_orphans = false;
1680
1681        // Validate and transform events (apply backpressure for build.done)
1682        let mut validated_events = Vec::new();
1683        let completion_topic = self.config.event_loop.completion_promise.as_str();
1684        let total_events = result.events.len();
1685        for (index, event) in result.events.into_iter().enumerate() {
1686            let payload = event.payload.clone().unwrap_or_default();
1687
1688            if event.topic == completion_topic {
1689                if index + 1 == total_events {
1690                    self.state.completion_requested = true;
1691                    self.diagnostics.log_orchestration(
1692                        self.state.iteration,
1693                        "jsonl",
1694                        crate::diagnostics::OrchestrationEvent::EventPublished {
1695                            topic: event.topic.clone(),
1696                        },
1697                    );
1698                    info!(
1699                        topic = %event.topic,
1700                        "Completion event detected in JSONL"
1701                    );
1702                } else {
1703                    warn!(
1704                        topic = %event.topic,
1705                        index = index,
1706                        total_events = total_events,
1707                        "Completion event ignored because it was not the last event"
1708                    );
1709                }
1710                continue;
1711            }
1712
1713            if event.topic == "build.done" {
1714                // Validate build.done events have backpressure evidence
1715                if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1716                    if evidence.all_passed() {
1717                        self.warn_on_mutation_evidence(&evidence);
1718                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1719                    } else {
1720                        // Evidence present but checks failed - synthesize build.blocked
1721                        warn!(
1722                            tests = evidence.tests_passed,
1723                            lint = evidence.lint_passed,
1724                            typecheck = evidence.typecheck_passed,
1725                            audit = evidence.audit_passed,
1726                            coverage = evidence.coverage_passed,
1727                            complexity = evidence.complexity_score,
1728                            duplication = evidence.duplication_passed,
1729                            performance = evidence.performance_regression,
1730                            specs = evidence.specs_verified,
1731                            "build.done rejected: backpressure checks failed"
1732                        );
1733
1734                        let complexity = evidence
1735                            .complexity_score
1736                            .map(|value| format!("{value:.2}"))
1737                            .unwrap_or_else(|| "missing".to_string());
1738                        let performance = match evidence.performance_regression {
1739                            Some(true) => "regression".to_string(),
1740                            Some(false) => "pass".to_string(),
1741                            None => "missing".to_string(),
1742                        };
1743                        let specs = match evidence.specs_verified {
1744                            Some(true) => "pass".to_string(),
1745                            Some(false) => "fail".to_string(),
1746                            None => "not reported".to_string(),
1747                        };
1748
1749                        self.diagnostics.log_orchestration(
1750                            self.state.iteration,
1751                            "jsonl",
1752                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1753                                reason: format!(
1754                                    "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
1755                                    evidence.tests_passed,
1756                                    evidence.lint_passed,
1757                                    evidence.typecheck_passed,
1758                                    evidence.audit_passed,
1759                                    evidence.coverage_passed,
1760                                    complexity,
1761                                    evidence.duplication_passed,
1762                                    performance,
1763                                    specs
1764                                ),
1765                            },
1766                        );
1767
1768                        validated_events.push(Event::new(
1769                            "build.blocked",
1770                            "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
1771                        ));
1772                    }
1773                } else {
1774                    // No evidence found - synthesize build.blocked
1775                    warn!("build.done rejected: missing backpressure evidence");
1776
1777                    self.diagnostics.log_orchestration(
1778                        self.state.iteration,
1779                        "jsonl",
1780                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1781                            reason: "missing backpressure evidence".to_string(),
1782                        },
1783                    );
1784
1785                    validated_events.push(Event::new(
1786                        "build.blocked",
1787                        "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
1788                    ));
1789                }
1790            } else if event.topic == "review.done" {
1791                // Validate review.done events have verification evidence
1792                if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
1793                    if evidence.is_verified() {
1794                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1795                    } else {
1796                        // Evidence present but checks failed - synthesize review.blocked
1797                        warn!(
1798                            tests = evidence.tests_passed,
1799                            build = evidence.build_passed,
1800                            "review.done rejected: verification checks failed"
1801                        );
1802
1803                        self.diagnostics.log_orchestration(
1804                            self.state.iteration,
1805                            "jsonl",
1806                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1807                                reason: format!(
1808                                    "review verification failed: tests={}, build={}",
1809                                    evidence.tests_passed, evidence.build_passed
1810                                ),
1811                            },
1812                        );
1813
1814                        validated_events.push(Event::new(
1815                            "review.blocked",
1816                            "Review verification failed. Run tests and build before emitting review.done.",
1817                        ));
1818                    }
1819                } else {
1820                    // No evidence found - synthesize review.blocked
1821                    warn!("review.done rejected: missing verification evidence");
1822
1823                    self.diagnostics.log_orchestration(
1824                        self.state.iteration,
1825                        "jsonl",
1826                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1827                            reason: "missing review verification evidence".to_string(),
1828                        },
1829                    );
1830
1831                    validated_events.push(Event::new(
1832                        "review.blocked",
1833                        "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
1834                    ));
1835                }
1836            } else if event.topic == "verify.passed" {
1837                if let Some(report) = EventParser::parse_quality_report(&payload) {
1838                    if report.meets_thresholds() {
1839                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1840                    } else {
1841                        let failed = report.failed_dimensions();
1842                        let reason = if failed.is_empty() {
1843                            "quality thresholds failed".to_string()
1844                        } else {
1845                            format!("quality thresholds failed: {}", failed.join(", "))
1846                        };
1847
1848                        warn!(
1849                            failed_dimensions = ?failed,
1850                            "verify.passed rejected: quality thresholds failed"
1851                        );
1852
1853                        self.diagnostics.log_orchestration(
1854                            self.state.iteration,
1855                            "jsonl",
1856                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1857                                reason,
1858                            },
1859                        );
1860
1861                        validated_events.push(Event::new(
1862                            "verify.failed",
1863                            "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
1864                        ));
1865                    }
1866                } else {
1867                    // No quality report found - synthesize verify.failed
1868                    warn!("verify.passed rejected: missing quality report");
1869
1870                    self.diagnostics.log_orchestration(
1871                        self.state.iteration,
1872                        "jsonl",
1873                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1874                            reason: "missing quality report".to_string(),
1875                        },
1876                    );
1877
1878                    validated_events.push(Event::new(
1879                        "verify.failed",
1880                        "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
1881                    ));
1882                }
1883            } else if event.topic == "verify.failed" {
1884                if EventParser::parse_quality_report(&payload).is_none() {
1885                    warn!("verify.failed missing quality report");
1886                }
1887                validated_events.push(Event::new(event.topic.as_str(), &payload));
1888            } else {
1889                // Non-backpressure events pass through unchanged
1890                validated_events.push(Event::new(event.topic.as_str(), &payload));
1891            }
1892        }
1893
1894        // Track build.blocked events for thrashing detection
1895        let blocked_events: Vec<_> = validated_events
1896            .iter()
1897            .filter(|e| e.topic == "build.blocked".into())
1898            .collect();
1899
1900        for blocked_event in &blocked_events {
1901            let task_id = Self::extract_task_id(&blocked_event.payload);
1902
1903            let count = self
1904                .state
1905                .task_block_counts
1906                .entry(task_id.clone())
1907                .or_insert(0);
1908            *count += 1;
1909
1910            debug!(
1911                task_id = %task_id,
1912                block_count = *count,
1913                "Task blocked"
1914            );
1915
1916            // After 3 blocks on same task, emit build.task.abandoned
1917            if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
1918                warn!(
1919                    task_id = %task_id,
1920                    "Task abandoned after 3 consecutive blocks"
1921                );
1922
1923                self.state.abandoned_tasks.push(task_id.clone());
1924
1925                self.diagnostics.log_orchestration(
1926                    self.state.iteration,
1927                    "jsonl",
1928                    crate::diagnostics::OrchestrationEvent::TaskAbandoned {
1929                        reason: format!(
1930                            "3 consecutive build.blocked events for task '{}'",
1931                            task_id
1932                        ),
1933                    },
1934                );
1935
1936                let abandoned_event = Event::new(
1937                    "build.task.abandoned",
1938                    format!(
1939                        "Task '{}' abandoned after 3 consecutive build.blocked events",
1940                        task_id
1941                    ),
1942                );
1943
1944                self.bus.publish(abandoned_event);
1945            }
1946        }
1947
1948        // Track hat-level blocking for legacy thrashing detection
1949        let has_blocked_event = !blocked_events.is_empty();
1950
1951        if has_blocked_event {
1952            self.state.consecutive_blocked += 1;
1953        } else {
1954            self.state.consecutive_blocked = 0;
1955            self.state.last_blocked_hat = None;
1956        }
1957
1958        // Handle human.interact blocking behavior:
1959        // When a human.interact event is detected and robot service is active,
1960        // send the question and block until human.response or timeout.
1961        let mut response_event = None;
1962        let ask_human_idx = validated_events
1963            .iter()
1964            .position(|e| e.topic == "human.interact".into());
1965
1966        if let Some(idx) = ask_human_idx {
1967            let ask_event = &validated_events[idx];
1968            let payload = ask_event.payload.clone();
1969
1970            if let Some(ref robot_service) = self.robot_service {
1971                info!(
1972                    payload = %payload,
1973                    "human.interact event detected — sending question via robot service"
1974                );
1975
1976                // Send the question (includes retry with exponential backoff)
1977                let send_ok = match robot_service.send_question(&payload) {
1978                    Ok(_message_id) => true,
1979                    Err(e) => {
1980                        warn!(
1981                            error = %e,
1982                            "Failed to send human.interact question after retries — treating as timeout"
1983                        );
1984                        // Log to diagnostics
1985                        self.diagnostics.log_error(
1986                            self.state.iteration,
1987                            "telegram",
1988                            crate::diagnostics::DiagnosticError::TelegramSendError {
1989                                operation: "send_question".to_string(),
1990                                error: e.to_string(),
1991                                retry_count: 3,
1992                            },
1993                        );
1994                        false
1995                    }
1996                };
1997
1998                // Block: poll events file for human.response
1999                // Per spec, even on send failure we treat as timeout (continue without blocking)
2000                if send_ok {
2001                    // Read the active events path from the current-events marker,
2002                    // falling back to the default events.jsonl if not available.
2003                    let events_path = self
2004                        .loop_context
2005                        .as_ref()
2006                        .and_then(|ctx| {
2007                            std::fs::read_to_string(ctx.current_events_marker())
2008                                .ok()
2009                                .map(|s| ctx.workspace().join(s.trim()))
2010                        })
2011                        .or_else(|| {
2012                            std::fs::read_to_string(".ralph/current-events")
2013                                .ok()
2014                                .map(|s| PathBuf::from(s.trim()))
2015                        })
2016                        .unwrap_or_else(|| {
2017                            self.loop_context
2018                                .as_ref()
2019                                .map(|ctx| ctx.events_path())
2020                                .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2021                        });
2022
2023                    match robot_service.wait_for_response(&events_path) {
2024                        Ok(Some(response)) => {
2025                            info!(
2026                                response = %response,
2027                                "Received human.response — continuing loop"
2028                            );
2029                            // Create a human.response event to inject into the bus
2030                            response_event = Some(Event::new("human.response", &response));
2031                        }
2032                        Ok(None) => {
2033                            warn!(
2034                                timeout_secs = robot_service.timeout_secs(),
2035                                "Human response timeout — continuing without response"
2036                            );
2037                        }
2038                        Err(e) => {
2039                            warn!(
2040                                error = %e,
2041                                "Error waiting for human response — continuing without response"
2042                            );
2043                        }
2044                    }
2045                }
2046            } else {
2047                debug!(
2048                    "human.interact event detected but no robot service active — passing through"
2049                );
2050            }
2051        }
2052
2053        // Publish validated events to the bus.
2054        // Ralph is always registered with subscribe("*"), so every event has at least
2055        // one subscriber. Events without a specific hat subscriber are "orphaned" —
2056        // Ralph handles them as the universal fallback.
2057        for event in validated_events {
2058            self.diagnostics.log_orchestration(
2059                self.state.iteration,
2060                "jsonl",
2061                crate::diagnostics::OrchestrationEvent::EventPublished {
2062                    topic: event.topic.to_string(),
2063                },
2064            );
2065
2066            if !self.registry.has_subscriber(event.topic.as_str()) {
2067                has_orphans = true;
2068            }
2069
2070            debug!(
2071                topic = %event.topic,
2072                "Publishing event from JSONL"
2073            );
2074            self.bus.publish(event);
2075        }
2076
2077        // Publish human.response event if one was received during blocking
2078        if let Some(response) = response_event {
2079            info!(
2080                topic = %response.topic,
2081                "Publishing human.response event from robot service"
2082            );
2083            self.bus.publish(response);
2084        }
2085
2086        Ok(has_orphans)
2087    }
2088
2089    /// Checks if output contains a completion event from Ralph.
2090    ///
2091    /// Completion must be emitted as an `<event>` tag, not plain text.
2092    pub fn check_ralph_completion(&self, output: &str) -> bool {
2093        let events = EventParser::new().parse(output);
2094        events
2095            .iter()
2096            .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2097    }
2098
2099    /// Publishes the loop.terminate system event to observers.
2100    ///
2101    /// Per spec: "Published by the orchestrator (not agents) when the loop exits."
2102    /// This is an observer-only event—hats cannot trigger on it.
2103    ///
2104    /// Returns the event for logging purposes.
2105    pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2106        // Stop the robot service if it was running
2107        self.stop_robot_service();
2108
2109        let elapsed = self.state.elapsed();
2110        let duration_str = format_duration(elapsed);
2111
2112        let payload = format!(
2113            "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2114            reason.as_str(),
2115            termination_status_text(reason),
2116            self.state.iteration,
2117            duration_str,
2118            reason.exit_code()
2119        );
2120
2121        let event = Event::new("loop.terminate", &payload);
2122
2123        // Publish to bus for observers (but no hat can trigger on this)
2124        self.bus.publish(event.clone());
2125
2126        info!(
2127            reason = %reason.as_str(),
2128            iterations = self.state.iteration,
2129            duration = %duration_str,
2130            "Wrapping up: {}. {} iterations in {}.",
2131            reason.as_str(),
2132            self.state.iteration,
2133            duration_str
2134        );
2135
2136        event
2137    }
2138
2139    /// Returns the robot service's shutdown flag, if active.
2140    ///
2141    /// Signal handlers can set this flag to interrupt `wait_for_response()`
2142    /// without waiting for the full timeout.
2143    pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2144        self.robot_service.as_ref().map(|s| s.shutdown_flag())
2145    }
2146
2147    /// Stops the robot service if it's running.
2148    ///
2149    /// Called during loop termination to cleanly shut down the communication backend.
2150    fn stop_robot_service(&mut self) {
2151        if let Some(service) = self.robot_service.take() {
2152            service.stop();
2153        }
2154    }
2155
2156    // -------------------------------------------------------------------------
2157    // Human-in-the-loop planning support
2158    // -------------------------------------------------------------------------
2159
2160    /// Check if any event is a `user.prompt` event.
2161    ///
2162    /// Returns the first user prompt event found, or None.
2163    pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2164        events
2165            .iter()
2166            .find(|e| e.topic.as_str() == "user.prompt")
2167            .map(|e| UserPrompt {
2168                id: Self::extract_prompt_id(&e.payload),
2169                text: e.payload.clone(),
2170            })
2171    }
2172
2173    /// Extract a prompt ID from the event payload.
2174    ///
2175    /// Supports both XML attribute format: `<event topic="user.prompt" id="q1">...</event>`
2176    /// and JSON format in payload.
2177    fn extract_prompt_id(payload: &str) -> String {
2178        // Try to extract id attribute from XML-like format first
2179        if let Some(start) = payload.find("id=\"")
2180            && let Some(end) = payload[start + 4..].find('"')
2181        {
2182            return payload[start + 4..start + 4 + end].to_string();
2183        }
2184
2185        // Fallback: generate a simple ID based on timestamp
2186        format!("q{}", Self::generate_prompt_id())
2187    }
2188
2189    /// Generate a simple unique ID for prompts.
2190    /// Uses timestamp-based generation since uuid crate isn't available.
2191    fn generate_prompt_id() -> String {
2192        use std::time::{SystemTime, UNIX_EPOCH};
2193        let nanos = SystemTime::now()
2194            .duration_since(UNIX_EPOCH)
2195            .unwrap()
2196            .as_nanos();
2197        format!("{:x}", nanos % 0xFFFF_FFFF)
2198    }
2199}
2200
2201/// A user prompt that requires human input.
2202///
2203/// Created when the agent emits a `user.prompt` event during planning.
2204#[derive(Debug, Clone)]
2205pub struct UserPrompt {
2206    /// Unique identifier for this prompt (e.g., "q1", "q2")
2207    pub id: String,
2208    /// The prompt/question text
2209    pub text: String,
2210}
2211
2212/// Formats a duration as human-readable string.
2213fn format_duration(d: Duration) -> String {
2214    let total_secs = d.as_secs();
2215    let hours = total_secs / 3600;
2216    let minutes = (total_secs % 3600) / 60;
2217    let seconds = total_secs % 60;
2218
2219    if hours > 0 {
2220        format!("{}h {}m {}s", hours, minutes, seconds)
2221    } else if minutes > 0 {
2222        format!("{}m {}s", minutes, seconds)
2223    } else {
2224        format!("{}s", seconds)
2225    }
2226}
2227
2228/// Returns a human-readable status based on termination reason.
2229fn termination_status_text(reason: &TerminationReason) -> &'static str {
2230    match reason {
2231        TerminationReason::CompletionPromise => "All tasks completed successfully.",
2232        TerminationReason::MaxIterations => "Stopped at iteration limit.",
2233        TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2234        TerminationReason::MaxCost => "Stopped at cost limit.",
2235        TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2236        TerminationReason::LoopThrashing => {
2237            "Loop thrashing detected - same hat repeatedly blocked."
2238        }
2239        TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2240        TerminationReason::Stopped => "Manually stopped.",
2241        TerminationReason::Interrupted => "Interrupted by signal.",
2242        TerminationReason::RestartRequested => "Restarting by human request.",
2243    }
2244}