Skip to main content

ralph_core/event_loop/
mod.rs

1//! Event loop orchestration.
2//!
3//! The event loop coordinates the execution of hats via pub/sub messaging.
4
5mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig, ScratchpadConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use crate::text::floor_char_boundary;
21use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
22use serde_json::{Map, Value};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use std::time::Duration;
27use tracing::{debug, info, warn};
28
29/// Result of processing events from JSONL.
30#[derive(Debug, Clone)]
31pub struct ProcessedEvents {
32    /// Whether any valid events were found and published.
33    pub had_events: bool,
34    /// Whether any published events matched the semantic `plan.*` topic family.
35    pub had_plan_events: bool,
36    /// Structured context for the first processed `human.interact` event,
37    /// including the question payload and post-dispatch outcome metadata.
38    pub human_interact_context: Option<Value>,
39    /// Whether any events lacked specific hat subscribers (orphans handled by Ralph).
40    pub has_orphans: bool,
41}
42
43/// Result of processing events from JSONL with wave events partitioned out.
44#[derive(Debug)]
45pub struct ProcessedEventsWithWaves {
46    /// Normal event processing results.
47    pub processed: ProcessedEvents,
48    /// Wave events extracted before normal processing (have wave_id set).
49    pub wave_events: Vec<crate::event_reader::Event>,
50}
51
52/// Reason the event loop terminated.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum TerminationReason {
55    /// Completion promise was detected in output.
56    CompletionPromise,
57    /// Maximum iterations reached.
58    MaxIterations,
59    /// Maximum runtime exceeded.
60    MaxRuntime,
61    /// Maximum cost exceeded.
62    MaxCost,
63    /// Too many consecutive failures.
64    ConsecutiveFailures,
65    /// Loop thrashing detected (repeated blocked events).
66    LoopThrashing,
67    /// Stale loop detected (same topic emitted 3+ times consecutively).
68    LoopStale,
69    /// Too many consecutive malformed JSONL lines in events file.
70    ValidationFailure,
71    /// Manually stopped.
72    Stopped,
73    /// Interrupted by signal (SIGINT/SIGTERM).
74    Interrupted,
75    /// Restart requested via Telegram `/restart` command.
76    RestartRequested,
77    /// Workspace directory (worktree) was removed externally.
78    WorkspaceGone,
79    /// Loop was cancelled gracefully via loop.cancel event (human rejection, timeout).
80    Cancelled,
81}
82
83impl TerminationReason {
84    /// Returns the exit code for this termination reason per spec.
85    ///
86    /// Per spec "Loop Termination" section:
87    /// - 0: Completion promise detected (success)
88    /// - 1: Consecutive failures or unrecoverable error (failure)
89    /// - 2: Max iterations, max runtime, or max cost exceeded (limit)
90    /// - 130: User interrupt (SIGINT = 128 + 2)
91    pub fn exit_code(&self) -> i32 {
92        match self {
93            TerminationReason::CompletionPromise => 0,
94            TerminationReason::ConsecutiveFailures
95            | TerminationReason::LoopThrashing
96            | TerminationReason::LoopStale
97            | TerminationReason::ValidationFailure
98            | TerminationReason::Stopped
99            | TerminationReason::WorkspaceGone => 1,
100            TerminationReason::MaxIterations
101            | TerminationReason::MaxRuntime
102            | TerminationReason::MaxCost => 2,
103            TerminationReason::Interrupted => 130,
104            // Restart uses exit code 3 to signal the caller to exec-replace
105            TerminationReason::RestartRequested => 3,
106            // Cancelled is a clean exit (0) — the loop stopped intentionally
107            TerminationReason::Cancelled => 0,
108        }
109    }
110
111    /// Returns the reason string for use in loop.terminate event payload.
112    ///
113    /// Per spec event payload format:
114    /// `completed | max_iterations | max_runtime | consecutive_failures | interrupted | error`
115    pub fn as_str(&self) -> &'static str {
116        match self {
117            TerminationReason::CompletionPromise => "completed",
118            TerminationReason::MaxIterations => "max_iterations",
119            TerminationReason::MaxRuntime => "max_runtime",
120            TerminationReason::MaxCost => "max_cost",
121            TerminationReason::ConsecutiveFailures => "consecutive_failures",
122            TerminationReason::LoopThrashing => "loop_thrashing",
123            TerminationReason::LoopStale => "loop_stale",
124            TerminationReason::ValidationFailure => "validation_failure",
125            TerminationReason::Stopped => "stopped",
126            TerminationReason::Interrupted => "interrupted",
127            TerminationReason::RestartRequested => "restart_requested",
128            TerminationReason::WorkspaceGone => "workspace_gone",
129            TerminationReason::Cancelled => "cancelled",
130        }
131    }
132
133    /// Returns true if this is a successful completion (not an error or limit).
134    pub fn is_success(&self) -> bool {
135        matches!(self, TerminationReason::CompletionPromise)
136    }
137}
138
139/// The main event loop orchestrator.
140pub struct EventLoop {
141    config: RalphConfig,
142    registry: HatRegistry,
143    bus: EventBus,
144    state: LoopState,
145    instruction_builder: InstructionBuilder,
146    ralph: HatlessRalph,
147    /// Cached human guidance messages that should persist across iterations.
148    robot_guidance: Vec<String>,
149    /// Event reader for consuming events from JSONL file.
150    /// Made pub(crate) to allow tests to override the path.
151    pub(crate) event_reader: EventReader,
152    diagnostics: crate::diagnostics::DiagnosticsCollector,
153    /// Loop context for path resolution (None for legacy single-loop mode).
154    loop_context: Option<LoopContext>,
155    /// Skill registry for the current loop.
156    skill_registry: SkillRegistry,
157    /// Robot service for human-in-the-loop communication.
158    /// Injected externally when `human.enabled` is true and this is the primary loop.
159    robot_service: Option<Box<dyn RobotService>>,
160}
161
162impl EventLoop {
163    /// Creates a new event loop from configuration.
164    pub fn new(config: RalphConfig) -> Self {
165        // Try to create diagnostics collector, but fall back to disabled if it fails
166        // (e.g., in tests without proper directory setup)
167        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
168            .unwrap_or_else(|e| {
169                debug!(
170                    "Failed to initialize diagnostics: {}, using disabled collector",
171                    e
172                );
173                crate::diagnostics::DiagnosticsCollector::disabled()
174            });
175
176        Self::with_diagnostics(config, diagnostics)
177    }
178
179    /// Creates a new event loop with a loop context for path resolution.
180    ///
181    /// The loop context determines where events, tasks, and other state files
182    /// are located. Use this for multi-loop scenarios where each loop runs
183    /// in an isolated workspace (git worktree).
184    pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
185        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
186            .unwrap_or_else(|e| {
187                debug!(
188                    "Failed to initialize diagnostics: {}, using disabled collector",
189                    e
190                );
191                crate::diagnostics::DiagnosticsCollector::disabled()
192            });
193
194        Self::with_context_and_diagnostics(config, context, diagnostics)
195    }
196
197    /// Creates a new event loop with explicit loop context and diagnostics.
198    pub fn with_context_and_diagnostics(
199        mut config: RalphConfig,
200        context: LoopContext,
201        diagnostics: crate::diagnostics::DiagnosticsCollector,
202    ) -> Self {
203        // Solo mode safety guard: force scratchpad enabled when no hats defined
204        if config.hats.is_empty() && !config.core.scratchpad.enabled {
205            warn!(
206                "core.scratchpad.enabled is false but no hats are defined. \
207                 Scratchpad is the only continuity mechanism in solo mode — forcing enabled."
208            );
209            config.core.scratchpad.enabled = true;
210        }
211
212        let registry = HatRegistry::from_config(&config);
213        let instruction_builder =
214            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
215
216        let mut bus = EventBus::new();
217
218        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
219        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
220        // Custom hats are registered first (higher priority), Ralph catches everything else.
221        for hat in registry.all() {
222            bus.register(hat.clone());
223        }
224
225        // Always register Ralph as catch-all coordinator
226        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
227        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
228        bus.register(ralph_hat);
229
230        if registry.is_empty() {
231            debug!("Solo mode: Ralph is the only coordinator");
232        } else {
233            debug!(
234                "Multi-hat mode: {} custom hats + Ralph as fallback",
235                registry.len()
236            );
237        }
238
239        // Build skill registry from config
240        let mut skill_registry = if config.skills.enabled {
241            SkillRegistry::from_config(
242                &config.skills,
243                context.workspace(),
244                Some(config.cli.backend.as_str()),
245            )
246            .unwrap_or_else(|e| {
247                warn!(
248                    "Failed to build skill registry: {}, using empty registry",
249                    e
250                );
251                SkillRegistry::new(Some(config.cli.backend.as_str()))
252            })
253        } else {
254            SkillRegistry::new(Some(config.cli.backend.as_str()))
255        };
256
257        // Remove task/memory skills from the index when their config is disabled
258        if !config.tasks.enabled {
259            skill_registry.remove("ralph-tools-tasks");
260        }
261        if !config.memories.enabled {
262            skill_registry.remove("ralph-tools-memories");
263        }
264
265        let skill_index = if config.skills.enabled {
266            skill_registry.build_index(None)
267        } else {
268            String::new()
269        };
270
271        // When memories are enabled, add tasks CLI instructions alongside scratchpad
272        let ralph = HatlessRalph::new(
273            config.event_loop.completion_promise.clone(),
274            config.core.clone(),
275            &registry,
276            config.event_loop.starting_event.clone(),
277        )
278        .with_memories_enabled(config.memories.enabled)
279        .with_skill_index(skill_index);
280
281        // Read timestamped events path from marker file, fall back to default
282        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
283        // which we resolve relative to the workspace root
284        let events_path = std::fs::read_to_string(context.current_events_marker())
285            .map(|s| {
286                let relative = s.trim();
287                context.workspace().join(relative)
288            })
289            .unwrap_or_else(|_| context.events_path());
290        let event_reader = EventReader::new(&events_path);
291
292        Self {
293            config,
294            registry,
295            bus,
296            state: LoopState::new(),
297            instruction_builder,
298            ralph,
299            robot_guidance: Vec::new(),
300            event_reader,
301            diagnostics,
302            loop_context: Some(context),
303            skill_registry,
304            robot_service: None,
305        }
306    }
307
308    /// Creates a new event loop with explicit diagnostics collector (for testing).
309    pub fn with_diagnostics(
310        mut config: RalphConfig,
311        diagnostics: crate::diagnostics::DiagnosticsCollector,
312    ) -> Self {
313        // Solo mode safety guard: force scratchpad enabled when no hats defined
314        if config.hats.is_empty() && !config.core.scratchpad.enabled {
315            warn!(
316                "core.scratchpad.enabled is false but no hats are defined. \
317                 Scratchpad is the only continuity mechanism in solo mode — forcing enabled."
318            );
319            config.core.scratchpad.enabled = true;
320        }
321
322        let registry = HatRegistry::from_config(&config);
323        let instruction_builder =
324            InstructionBuilder::with_events(config.core.clone(), config.events.clone());
325
326        let mut bus = EventBus::new();
327
328        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
329        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
330        // Custom hats are registered first (higher priority), Ralph catches everything else.
331        for hat in registry.all() {
332            bus.register(hat.clone());
333        }
334
335        // Always register Ralph as catch-all coordinator
336        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
337        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
338        bus.register(ralph_hat);
339
340        if registry.is_empty() {
341            debug!("Solo mode: Ralph is the only coordinator");
342        } else {
343            debug!(
344                "Multi-hat mode: {} custom hats + Ralph as fallback",
345                registry.len()
346            );
347        }
348
349        // Build skill registry from config
350        let workspace_root = std::path::Path::new(".");
351        let mut skill_registry = if config.skills.enabled {
352            SkillRegistry::from_config(
353                &config.skills,
354                workspace_root,
355                Some(config.cli.backend.as_str()),
356            )
357            .unwrap_or_else(|e| {
358                warn!(
359                    "Failed to build skill registry: {}, using empty registry",
360                    e
361                );
362                SkillRegistry::new(Some(config.cli.backend.as_str()))
363            })
364        } else {
365            SkillRegistry::new(Some(config.cli.backend.as_str()))
366        };
367
368        // Remove task/memory skills from the index when their config is disabled
369        if !config.tasks.enabled {
370            skill_registry.remove("ralph-tools-tasks");
371        }
372        if !config.memories.enabled {
373            skill_registry.remove("ralph-tools-memories");
374        }
375
376        let skill_index = if config.skills.enabled {
377            skill_registry.build_index(None)
378        } else {
379            String::new()
380        };
381
382        // When memories are enabled, add tasks CLI instructions alongside scratchpad
383        let ralph = HatlessRalph::new(
384            config.event_loop.completion_promise.clone(),
385            config.core.clone(),
386            &registry,
387            config.event_loop.starting_event.clone(),
388        )
389        .with_memories_enabled(config.memories.enabled)
390        .with_skill_index(skill_index);
391
392        // Read events path from marker file, fall back to default if not present
393        // The marker file is written by run_loop_impl() at run startup
394        let events_path = std::fs::read_to_string(".ralph/current-events")
395            .map(|s| s.trim().to_string())
396            .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
397        let event_reader = EventReader::new(&events_path);
398
399        Self {
400            config,
401            registry,
402            bus,
403            state: LoopState::new(),
404            instruction_builder,
405            ralph,
406            robot_guidance: Vec::new(),
407            event_reader,
408            diagnostics,
409            loop_context: None,
410            skill_registry,
411            robot_service: None,
412        }
413    }
414
415    /// Injects a robot service for human-in-the-loop communication.
416    ///
417    /// Call this after construction to enable `human.interact` event handling,
418    /// periodic check-ins, and question/response flow. The service is typically
419    /// created by the CLI layer (e.g., `TelegramService`) and injected here,
420    /// keeping the core event loop decoupled from any specific communication
421    /// platform.
422    pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
423        self.robot_service = Some(service);
424    }
425
426    /// Returns the loop context, if one was provided.
427    pub fn loop_context(&self) -> Option<&LoopContext> {
428        self.loop_context.as_ref()
429    }
430
431    /// Returns the tasks path based on loop context or default.
432    fn tasks_path(&self) -> PathBuf {
433        self.loop_context
434            .as_ref()
435            .map(|ctx| ctx.tasks_path())
436            .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
437    }
438
439    /// Returns the scratchpad path based on loop context and active scratchpad config.
440    ///
441    /// When a per-hat scratchpad override is active (path differs from global default),
442    /// the custom path is resolved relative to the loop context workspace for worktree
443    /// isolation. When using the default/global path, loop context's standard resolution
444    /// applies.
445    fn scratchpad_path(&self) -> PathBuf {
446        let active_path = &self.ralph.active_scratchpad().path;
447
448        match self.loop_context.as_ref() {
449            Some(ctx) => ctx.workspace().join(active_path),
450            None => PathBuf::from(active_path),
451        }
452    }
453
454    /// Returns the global scratchpad path (ignoring per-hat overrides).
455    /// Used for guidance persistence which is cross-hat state.
456    fn global_scratchpad_path(&self) -> PathBuf {
457        self.loop_context
458            .as_ref()
459            .map(|ctx| ctx.scratchpad_path())
460            .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad.path))
461    }
462
463    /// Returns the current loop state.
464    pub fn state(&self) -> &LoopState {
465        &self.state
466    }
467
468    /// Resets the stale-loop topic counter.
469    ///
470    /// Call after processing wave results — multiple events with the same topic
471    /// (e.g. `review.done` from parallel workers) are expected and should not
472    /// trigger the stale loop detector.
473    pub fn reset_stale_topic_counter(&mut self) {
474        self.state.consecutive_same_signature = 0;
475        self.state.last_emitted_signature = None;
476    }
477
478    /// Returns the configuration.
479    pub fn config(&self) -> &RalphConfig {
480        &self.config
481    }
482
483    /// Returns the hat registry.
484    pub fn registry(&self) -> &HatRegistry {
485        &self.registry
486    }
487
488    /// Records hook telemetry for diagnostics.
489    pub fn log_hook_run_telemetry(&self, entry: crate::diagnostics::HookRunTelemetryEntry) {
490        self.diagnostics.log_hook_run(entry);
491    }
492
493    /// Logs the full prompt for an iteration to the diagnostics session.
494    pub fn log_prompt(&self, iteration: u32, hat: &str, prompt: &str) {
495        self.diagnostics.log_prompt(iteration, hat, prompt);
496    }
497
498    /// Gets the backend configuration for a hat.
499    ///
500    /// If the hat has a backend configured, returns that.
501    /// Otherwise, returns None (caller should use global backend).
502    pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
503        self.registry
504            .get_config(hat_id)
505            .and_then(|config| config.backend.as_ref())
506    }
507
508    /// Adds an observer that receives all published events.
509    ///
510    /// Multiple observers can be added (e.g., session recorder + TUI).
511    /// Each observer is called before events are routed to subscribers.
512    pub fn add_observer<F>(&mut self, observer: F)
513    where
514        F: Fn(&Event) + Send + 'static,
515    {
516        self.bus.add_observer(observer);
517    }
518
519    /// Sets a single observer, clearing any existing observers.
520    ///
521    /// Prefer `add_observer` when multiple observers are needed.
522    #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
523    pub fn set_observer<F>(&mut self, observer: F)
524    where
525        F: Fn(&Event) + Send + 'static,
526    {
527        #[allow(deprecated)]
528        self.bus.set_observer(observer);
529    }
530
531    /// Checks if any termination condition is met.
532    pub fn check_termination(&self) -> Option<TerminationReason> {
533        let cfg = &self.config.event_loop;
534
535        if self.state.iteration >= cfg.max_iterations {
536            return Some(TerminationReason::MaxIterations);
537        }
538
539        if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
540            return Some(TerminationReason::MaxRuntime);
541        }
542
543        if let Some(max_cost) = cfg.max_cost_usd
544            && self.state.cumulative_cost >= max_cost
545        {
546            return Some(TerminationReason::MaxCost);
547        }
548
549        if self.state.consecutive_failures >= cfg.max_consecutive_failures {
550            return Some(TerminationReason::ConsecutiveFailures);
551        }
552
553        // Check for loop thrashing: planner keeps dispatching abandoned tasks
554        if self.state.abandoned_task_redispatches >= 3 {
555            return Some(TerminationReason::LoopThrashing);
556        }
557
558        // Check for validation failures: too many consecutive malformed JSONL lines
559        if self.state.consecutive_malformed_events >= 3 {
560            return Some(TerminationReason::ValidationFailure);
561        }
562
563        // Check for stale loop: same event signature emitted 3+ times in a row
564        if self.state.consecutive_same_signature >= 3 {
565            let topic = self
566                .state
567                .last_emitted_signature
568                .as_ref()
569                .map(|signature| signature.topic.as_str())
570                .unwrap_or("?");
571            warn!(
572                topic,
573                count = self.state.consecutive_same_signature,
574                "Stale loop detected: same event signature emitted consecutively"
575            );
576            return Some(TerminationReason::LoopStale);
577        }
578
579        // Check for stop signal from Telegram /stop or CLI stop-requested
580        let stop_path =
581            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
582        if stop_path.exists() {
583            let _ = std::fs::remove_file(&stop_path);
584            return Some(TerminationReason::Stopped);
585        }
586
587        // Check for restart signal from Telegram /restart command
588        let restart_path =
589            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
590        if restart_path.exists() {
591            return Some(TerminationReason::RestartRequested);
592        }
593
594        // Check if workspace directory has been removed (zombie worktree detection)
595        if !std::path::Path::new(&self.config.core.workspace_root).is_dir() {
596            return Some(TerminationReason::WorkspaceGone);
597        }
598
599        None
600    }
601
602    /// Check if a loop.cancel event was detected.
603    ///
604    /// Unlike check_completion_event(), this does NOT validate required_events.
605    /// Cancellation is an explicit abort — it doesn't need the workflow to be complete.
606    pub fn check_cancellation_event(&mut self) -> Option<TerminationReason> {
607        if !self.state.cancellation_requested {
608            return None;
609        }
610        self.state.cancellation_requested = false;
611        info!("Loop cancelled gracefully via loop.cancel event");
612
613        self.diagnostics.log_orchestration(
614            self.state.iteration,
615            "loop",
616            crate::diagnostics::OrchestrationEvent::LoopTerminated {
617                reason: "cancelled".to_string(),
618            },
619        );
620
621        Some(TerminationReason::Cancelled)
622    }
623
624    /// Request completion from the text fallback path.
625    ///
626    /// When a backend outputs a completion promise as plain text (without
627    /// using `ralph emit`), this sets `completion_requested = true` so that
628    /// `check_completion_event()` can apply all safety checks (persistent mode,
629    /// required events, runtime tasks) before terminating.
630    pub fn request_completion_from_text_fallback(&mut self) {
631        self.state.completion_requested = true;
632        info!("Completion requested via text fallback (output contained completion promise)");
633    }
634
635    /// Checks if a completion event was received and returns termination reason.
636    ///
637    /// Completion is accepted via JSONL events (e.g., `ralph emit`) or via
638    /// [`request_completion_from_text_fallback`].
639    pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
640        if !self.state.completion_requested {
641            return None;
642        }
643
644        // Event chain validation: check required events were seen
645        let required = &self.config.event_loop.required_events;
646        if !required.is_empty() {
647            let missing = self.state.missing_required_events(required);
648            if !missing.is_empty() {
649                warn!(
650                    missing = ?missing,
651                    "Rejecting LOOP_COMPLETE: required events not seen during loop lifetime"
652                );
653                self.state.completion_requested = false;
654
655                // Inject task.resume so the loop continues
656                let resume_payload = format!(
657                    "LOOP_COMPLETE rejected: missing required events: {:?}. \
658                     The agent must complete all workflow phases before emitting LOOP_COMPLETE. \
659                     Use loop.cancel to abort the workflow instead.",
660                    missing
661                );
662                self.bus.publish(Event::new("task.resume", resume_payload));
663                return None;
664            }
665        }
666
667        self.state.completion_requested = false;
668
669        // In persistent mode, suppress completion and keep the loop alive
670        if self.config.event_loop.persistent {
671            info!("Completion event suppressed - persistent mode active, loop staying alive");
672
673            self.diagnostics.log_orchestration(
674                self.state.iteration,
675                "loop",
676                crate::diagnostics::OrchestrationEvent::LoopTerminated {
677                    reason: "completion_event_suppressed_persistent".to_string(),
678                },
679            );
680
681            // Inject a task.resume event so the loop continues with an idle prompt
682            let resume_event = Event::new(
683                "task.resume",
684                "Persistent mode: loop staying alive after completion signal. \
685                 Check for new tasks or await human guidance.",
686            );
687            self.bus.publish(resume_event);
688
689            return None;
690        }
691
692        // Runtime tasks are the canonical queue when memories/tasks mode is enabled.
693        if self.config.memories.enabled {
694            if let Ok(false) = self.verify_tasks_complete() {
695                let open_tasks = self.get_open_task_list();
696                warn!(
697                    open_tasks = ?open_tasks,
698                    "Rejecting completion event with {} open task(s)",
699                    open_tasks.len()
700                );
701                self.bus.publish(Event::new(
702                    "task.resume",
703                    format!(
704                        "Completion rejected: runtime tasks remain open: {:?}. Close, fail, or reopen outstanding tasks before emitting the completion promise.",
705                        open_tasks
706                    ),
707                ));
708                return None;
709            }
710        } else if let Ok(false) = self.verify_scratchpad_complete() {
711            warn!("Completion event with pending scratchpad tasks - trusting agent decision");
712        }
713
714        info!("Completion event detected - terminating");
715
716        // Log loop terminated
717        self.diagnostics.log_orchestration(
718            self.state.iteration,
719            "loop",
720            crate::diagnostics::OrchestrationEvent::LoopTerminated {
721                reason: "completion_event".to_string(),
722            },
723        );
724
725        Some(TerminationReason::CompletionPromise)
726    }
727
728    /// Initializes the loop by publishing the start event.
729    pub fn initialize(&mut self, prompt_content: &str) {
730        // Use configured starting_event or default to task.start for backward compatibility
731        let topic = self
732            .config
733            .event_loop
734            .starting_event
735            .clone()
736            .unwrap_or_else(|| "task.start".to_string());
737        self.initialize_with_topic(&topic, prompt_content);
738    }
739
740    /// Initializes the loop for resume mode by publishing task.resume.
741    ///
742    /// Per spec: "User can run `ralph resume` to restart reading existing scratchpad."
743    /// The planner should read the existing scratchpad rather than doing fresh gap analysis.
744    pub fn initialize_resume(&mut self, prompt_content: &str) {
745        // Resume always uses task.resume regardless of starting_event config
746        self.initialize_with_topic("task.resume", prompt_content);
747    }
748
749    /// Common initialization logic with configurable topic.
750    fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
751        // Store the objective so it persists across all iterations.
752        // After iteration 1, bus.take_pending() consumes the start event,
753        // so without this the objective would be invisible to later hats.
754        self.ralph.set_objective(prompt_content.to_string());
755
756        let start_event = Event::new(topic, prompt_content);
757        self.bus.publish(start_event);
758        debug!(topic = topic, "Published {} event", topic);
759    }
760
761    /// Gets the next hat to execute (if any have pending events).
762    ///
763    /// Per "Hatless Ralph" architecture: When custom hats are defined, Ralph is
764    /// always the executor. Custom hats define topology (pub/sub contracts) that
765    /// Ralph uses for coordination context, but Ralph handles all iterations.
766    ///
767    /// - Solo mode (no custom hats): Returns "ralph" if Ralph has pending events
768    /// - Multi-hat mode (custom hats defined): Always returns "ralph" if ANY hat has pending events
769    pub fn next_hat(&self) -> Option<&HatId> {
770        let next = self.bus.next_hat_with_pending();
771
772        // If no pending hat events but human interactions are pending, route to Ralph.
773        if next.is_none() && self.bus.has_human_pending() {
774            return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
775        }
776
777        // If no pending events, return None
778        next.as_ref()?;
779
780        // In multi-hat mode, always route to Ralph (custom hats define topology only)
781        // Ralph's prompt includes the ## HATS section for coordination awareness
782        if self.registry.is_empty() {
783            // Solo mode - return the next hat (which is "ralph")
784            next
785        } else {
786            // Return "ralph" - the constant coordinator
787            // Find ralph in the bus's registered hats
788            self.bus.hat_ids().find(|id| id.as_str() == "ralph")
789        }
790    }
791
792    /// Advances the event reader to the current end of the events file.
793    ///
794    /// Call this after writing observability records (e.g. start event) to the
795    /// events JSONL file so they are not re-read by `process_events_from_jsonl`.
796    /// The start event is already published to the bus via `initialize()`, so
797    /// re-reading it from the file would cause double-delivery.
798    pub fn sync_event_reader_to_file_end(&mut self) {
799        let path = self.event_reader.path();
800        if let Ok(metadata) = std::fs::metadata(path) {
801            self.event_reader.set_position(metadata.len());
802        }
803    }
804
805    /// Checks if any hats have pending events.
806    ///
807    /// Use this after `process_output` to detect if the LLM failed to publish an event.
808    /// If false after processing, the loop will terminate on the next iteration.
809    pub fn has_pending_events(&self) -> bool {
810        self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
811    }
812
813    /// Checks if any pending events are human-related (human.response, human.guidance).
814    ///
815    /// Used to skip cooldown delays when a human event is next, since we don't
816    /// want to artificially delay the response to a human interaction.
817    pub fn has_pending_human_events(&self) -> bool {
818        self.bus.has_human_pending()
819    }
820
821    /// Injects `human.guidance` events directly into the in-memory bus.
822    ///
823    /// This is used for local TUI/RPC guidance so the next prompt boundary
824    /// sees the message immediately without waiting for a JSONL reread.
825    pub fn inject_human_guidance<I, S>(&mut self, messages: I)
826    where
827        I: IntoIterator<Item = S>,
828        S: Into<String>,
829    {
830        for message in messages {
831            let event = Event::new("human.guidance", message.into());
832            self.state.record_event(&event);
833            self.bus.publish(event);
834        }
835    }
836
837    /// Returns whether unread JSONL events include any semantic `plan.*` topics.
838    ///
839    /// This allows callers to dispatch `pre.plan.created` hooks before
840    /// event publication handling without consuming unread events.
841    pub fn has_pending_plan_events_in_jsonl(&self) -> std::io::Result<bool> {
842        let result = self.event_reader.peek_new_events()?;
843        Ok(result
844            .events
845            .iter()
846            .any(|event| event.topic.starts_with("plan.")))
847    }
848
849    /// Returns structured context for the first unread `human.interact` event,
850    /// if one is present in JSONL without consuming reader state.
851    pub fn pending_human_interact_context_in_jsonl(&self) -> std::io::Result<Option<Value>> {
852        let result = self.event_reader.peek_new_events()?;
853        Ok(result
854            .events
855            .iter()
856            .find(|event| event.topic == "human.interact")
857            .map(|event| {
858                Self::parse_human_interact_context(event.payload.as_deref().unwrap_or_default())
859            }))
860    }
861
862    /// Gets the topics a hat is allowed to publish.
863    ///
864    /// Used to build retry prompts when the LLM forgets to publish an event.
865    pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
866        self.registry
867            .get(hat_id)
868            .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
869            .unwrap_or_default()
870    }
871
872    /// Injects a fallback event to recover from a stalled loop.
873    ///
874    /// When no hats have pending events (agent failed to publish), this method
875    /// injects a `task.resume` event which Ralph will handle to attempt recovery.
876    ///
877    /// Returns true if a fallback event was injected, false if recovery is not possible.
878    pub fn inject_fallback_event(&mut self) -> bool {
879        // If a custom hat was last executing, target the fallback back to it
880        // This preserves hat context instead of always falling back to Ralph
881        let fallback_event = match &self.state.last_hat {
882            Some(hat_id) if hat_id.as_str() != "ralph" => {
883                let publishes = self.get_hat_publishes(hat_id);
884                let payload = if publishes.is_empty() {
885                    format!(
886                        "RECOVERY: Previous iteration by hat `{}` did not publish an event. \
887                         Emit exactly one valid next event via `ralph emit`, or stop only after \
888                         publishing the configured completion event.",
889                        hat_id.as_str()
890                    )
891                } else {
892                    format!(
893                        "RECOVERY: Previous iteration by hat `{}` did not publish an event. \
894                         This failed because no event was emitted. Emit exactly ONE valid next \
895                         event via `ralph emit`. Allowed topics: `{}`. Do not only write prose \
896                         or update files. Stop immediately after emitting.",
897                        hat_id.as_str(),
898                        publishes.join("`, `")
899                    )
900                };
901
902                debug!(
903                    hat = %hat_id.as_str(),
904                    "Injecting fallback event to recover - targeting last hat with task.resume"
905                );
906                Event::new("task.resume", payload).with_target(hat_id.clone())
907            }
908            _ => {
909                debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
910                Event::new(
911                    "task.resume",
912                    "RECOVERY: Previous iteration did not publish an event. \
913                     Review the scratchpad and either dispatch the next task or complete the loop.",
914                )
915            }
916        };
917
918        self.bus.publish(fallback_event);
919        true
920    }
921
922    /// Builds the prompt for a hat's execution.
923    ///
924    /// Per "Hatless Ralph" architecture:
925    /// - Solo mode: Ralph handles everything with his own prompt
926    /// - Multi-hat mode: Ralph is the sole executor, custom hats define topology only
927    ///
928    /// When in multi-hat mode, this method collects ALL pending events across all hats
929    /// and builds Ralph's prompt with that context. The `## HATS` section in Ralph's
930    /// prompt documents the topology for coordination awareness.
931    ///
932    /// If memories are configured with `inject: auto`, this method also prepends
933    /// primed memories to the prompt context. If a scratchpad file exists and is
934    /// non-empty, its content is also prepended (before memories).
935    pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
936        // Handle "ralph" hat - the constant coordinator
937        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
938        if hat_id.as_str() == "ralph" {
939            if self.registry.is_empty() {
940                // Solo mode - just Ralph's events, no hats to filter
941                let mut events = self.bus.take_pending(&hat_id.clone());
942                let mut human_events = self.bus.take_human_pending();
943                events.append(&mut human_events);
944
945                // Separate human.guidance events from regular events
946                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
947                    .into_iter()
948                    .partition(|e| e.topic.as_str() == "human.guidance");
949
950                let events_context = regular_events
951                    .iter()
952                    .map(|e| Self::format_event(e))
953                    .collect::<Vec<_>>()
954                    .join("\n");
955
956                // Solo mode: set scratchpad and iteration before guidance persistence
957                self.ralph
958                    .set_active_scratchpad(self.config.core.scratchpad.clone());
959                self.ralph.set_iteration(self.state.iteration);
960
961                // Persist and inject human guidance into prompt if present
962                self.update_robot_guidance(guidance_events);
963                self.apply_robot_guidance();
964
965                // Build base prompt and prepend memories + scratchpad + ready tasks
966                let base_prompt = self.ralph.build_prompt(&events_context, &[]);
967                self.ralph.clear_robot_guidance();
968                let with_skills = self.prepend_auto_inject_skills(base_prompt);
969                let with_scratchpad = self.prepend_scratchpad(with_skills);
970                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
971
972                debug!("build_prompt: routing to HatlessRalph (solo mode)");
973                return Some(final_prompt);
974            } else {
975                // Multi-hat mode: collect events and determine active hats
976                let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
977                // Deterministic ordering (avoid HashMap iteration order nondeterminism).
978                all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
979
980                let mut all_events = Vec::new();
981                let mut system_events = Vec::new();
982
983                for id in &all_hat_ids {
984                    let pending = self.bus.take_pending(id);
985                    if pending.is_empty() {
986                        continue;
987                    }
988
989                    let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
990                    if drop_pending {
991                        // Drop the pending events that would have activated the hat.
992                        if let Some(exhausted_event) = exhausted_event {
993                            all_events.push(exhausted_event.clone());
994                            system_events.push(exhausted_event);
995                        }
996                        continue;
997                    }
998
999                    all_events.extend(pending);
1000                }
1001
1002                let mut human_events = self.bus.take_human_pending();
1003                all_events.append(&mut human_events);
1004
1005                // Publish orchestrator-generated system events after consuming pending events,
1006                // so they become visible in the event log and can be handled next iteration.
1007                for event in system_events {
1008                    self.bus.publish(event);
1009                }
1010
1011                // Separate human.guidance events from regular events
1012                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
1013                    .into_iter()
1014                    .partition(|e| e.topic.as_str() == "human.guidance");
1015
1016                // Ignore kickoff/recovery noise when a real downstream event is pending.
1017                let effective_regular_events = self.effective_regular_events(&regular_events);
1018
1019                // Determine which hats are active based on regular events
1020                let active_hat_ids = self.determine_active_hat_ids(&regular_events);
1021                self.record_hat_activations(&active_hat_ids);
1022                self.state.last_active_hat_ids = active_hat_ids.clone();
1023
1024                // Resolve scratchpad config for the active hat (or global default).
1025                // Must happen BEFORE guidance persistence so guidance is written
1026                // to the correct hat's scratchpad file.
1027                let resolved_scratchpad = if let Some(hat_id) = active_hat_ids.first() {
1028                    let hat_scratchpad = self
1029                        .registry
1030                        .get_config(hat_id)
1031                        .and_then(|c| c.scratchpad.as_ref());
1032                    ScratchpadConfig::resolve(hat_scratchpad, &self.config.core.scratchpad)
1033                } else {
1034                    // Ralph coordinating — use global
1035                    self.config.core.scratchpad.clone()
1036                };
1037                self.ralph.set_active_scratchpad(resolved_scratchpad);
1038                self.ralph.set_iteration(self.state.iteration);
1039
1040                // Persist and inject human guidance after scratchpad resolution
1041                // (must also happen before immutable borrows from determine_active_hats)
1042                self.update_robot_guidance(guidance_events);
1043                self.apply_robot_guidance();
1044
1045                let active_hats = self.determine_active_hats(&regular_events);
1046
1047                // Format events for context
1048                let events_context = effective_regular_events
1049                    .iter()
1050                    .map(|e| Self::format_event(e))
1051                    .collect::<Vec<_>>()
1052                    .join("\n");
1053
1054                // Build base prompt and prepend memories + scratchpad if available
1055                let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
1056
1057                // Build prompt with active hats - filters instructions to only active hats
1058                debug!(
1059                    "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
1060                    active_hats
1061                        .iter()
1062                        .map(|h| h.id.as_str())
1063                        .collect::<Vec<_>>()
1064                );
1065
1066                // Clear guidance after active_hats references are no longer needed
1067                self.ralph.clear_robot_guidance();
1068                let with_skills = self.prepend_auto_inject_skills(base_prompt);
1069                let with_scratchpad = self.prepend_scratchpad(with_skills);
1070                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
1071
1072                return Some(final_prompt);
1073            }
1074        }
1075
1076        // Non-ralph hat requested - this shouldn't happen in multi-hat mode since
1077        // next_hat() always returns "ralph" when custom hats are defined.
1078        // But we keep this code path for backward compatibility and tests.
1079        let events = self.bus.take_pending(&hat_id.clone());
1080        let events_context = events
1081            .iter()
1082            .map(|e| Self::format_event(e))
1083            .collect::<Vec<_>>()
1084            .join("\n");
1085
1086        let hat = self.registry.get(hat_id)?;
1087
1088        // Debug logging to trace hat routing
1089        debug!(
1090            "build_prompt: hat_id='{}', instructions.is_empty()={}",
1091            hat_id.as_str(),
1092            hat.instructions.is_empty()
1093        );
1094
1095        // All hats use build_custom_hat with ghuntley-style prompts
1096        debug!(
1097            "build_prompt: routing to build_custom_hat() for '{}'",
1098            hat_id.as_str()
1099        );
1100        Some(
1101            self.instruction_builder
1102                .build_custom_hat(hat, &events_context),
1103        )
1104    }
1105
1106    /// Stores guidance payloads, persists them to scratchpad, and prepares them for prompt injection.
1107    ///
1108    /// Guidance events are ephemeral in the event bus (consumed by `take_pending`).
1109    /// This method both caches them in memory for prompt injection and appends
1110    /// them to the scratchpad file so they survive across process restarts.
1111    fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
1112        if guidance_events.is_empty() {
1113            return;
1114        }
1115
1116        // Persist new guidance to scratchpad before caching
1117        self.persist_guidance_to_scratchpad(&guidance_events);
1118
1119        self.robot_guidance
1120            .extend(guidance_events.into_iter().map(|e| e.payload));
1121    }
1122
1123    /// Appends human guidance entries to the scratchpad file for durability.
1124    ///
1125    /// Each guidance message is written as a timestamped markdown entry so it
1126    /// appears alongside the agent's own thinking and survives process restarts.
1127    ///
1128    /// When scratchpad is disabled for the current hat, persists to the global
1129    /// scratchpad path (guidance is cross-hat state). If global is also disabled,
1130    /// skips persistence.
1131    fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
1132        use std::io::Write;
1133
1134        // When hat scratchpad is disabled, fall back to global scratchpad
1135        let scratchpad_path = if self.ralph.active_scratchpad().enabled {
1136            self.scratchpad_path()
1137        } else {
1138            if !self.config.core.scratchpad.enabled {
1139                debug!("Both hat and global scratchpad disabled, skipping guidance persistence");
1140                return;
1141            }
1142            self.global_scratchpad_path()
1143        };
1144        let resolved_path = if scratchpad_path.is_relative() {
1145            self.config.core.workspace_root.join(&scratchpad_path)
1146        } else {
1147            scratchpad_path
1148        };
1149
1150        // Create parent directories if needed
1151        if let Some(parent) = resolved_path.parent()
1152            && !parent.exists()
1153            && let Err(e) = std::fs::create_dir_all(parent)
1154        {
1155            warn!("Failed to create scratchpad directory: {}", e);
1156            return;
1157        }
1158
1159        let mut file = match std::fs::OpenOptions::new()
1160            .create(true)
1161            .append(true)
1162            .open(&resolved_path)
1163        {
1164            Ok(f) => f,
1165            Err(e) => {
1166                warn!("Failed to open scratchpad for guidance persistence: {}", e);
1167                return;
1168            }
1169        };
1170
1171        let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
1172        for event in guidance_events {
1173            let entry = format!(
1174                "\n### HUMAN GUIDANCE ({})\n\n{}\n",
1175                timestamp, event.payload
1176            );
1177            if let Err(e) = file.write_all(entry.as_bytes()) {
1178                warn!("Failed to write guidance to scratchpad: {}", e);
1179            }
1180        }
1181
1182        info!(
1183            count = guidance_events.len(),
1184            "Persisted human guidance to scratchpad"
1185        );
1186    }
1187
1188    /// Injects cached guidance into the next prompt build.
1189    fn apply_robot_guidance(&mut self) {
1190        if self.robot_guidance.is_empty() {
1191            return;
1192        }
1193
1194        self.ralph.set_robot_guidance(self.robot_guidance.clone());
1195    }
1196
1197    /// Prepends auto-injected skill content to the prompt.
1198    ///
1199    /// This generalizes the former `prepend_memories()` into a skill auto-injection
1200    /// pipeline that handles memories, tools, and any other auto-inject skills.
1201    ///
1202    /// Injection order:
1203    /// 1. Memory data + ralph-tools skill (special case: loads memory data from store, applies budget)
1204    /// 2. RObot interaction skill (gated by `robot.enabled`)
1205    /// 3. Other auto-inject skills from the registry (wrapped in XML tags)
1206    fn prepend_auto_inject_skills(&self, prompt: String) -> String {
1207        let mut prefix = String::new();
1208
1209        // 1. Memory data + ralph-tools skill — special case with data loading
1210        self.inject_memories_and_tools_skill(&mut prefix);
1211
1212        // 2. RObot interaction skill — gated by robot.enabled
1213        self.inject_robot_skill(&mut prefix);
1214
1215        // 3. Other auto-inject skills from the registry
1216        self.inject_custom_auto_skills(&mut prefix);
1217
1218        if prefix.is_empty() {
1219            return prompt;
1220        }
1221
1222        prefix.push_str("\n\n");
1223        prefix.push_str(&prompt);
1224        prefix
1225    }
1226
1227    /// Injects memory data and the ralph-tools skill into the prefix.
1228    ///
1229    /// Special case: loads memory entries from the store, applies budget
1230    /// truncation, then appends the ralph-tools skill content (which covers
1231    /// both tasks and memories CLI usage).
1232    /// Memory data is gated by `memories.enabled && memories.inject == Auto`.
1233    /// The ralph-tools skill is injected when either memories or tasks are enabled.
1234    fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
1235        let memories_config = &self.config.memories;
1236
1237        // Inject memory DATA if memories are enabled with auto-inject
1238        if memories_config.enabled && memories_config.inject == InjectMode::Auto {
1239            info!(
1240                "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
1241                memories_config.enabled, memories_config.inject, self.config.core.workspace_root
1242            );
1243
1244            let workspace_root = &self.config.core.workspace_root;
1245            let store = MarkdownMemoryStore::with_default_path(workspace_root);
1246            let memories_path = workspace_root.join(".ralph/agent/memories.md");
1247
1248            info!(
1249                "Looking for memories at: {:?} (exists: {})",
1250                memories_path,
1251                memories_path.exists()
1252            );
1253
1254            let memories = match store.load() {
1255                Ok(memories) => {
1256                    info!("Successfully loaded {} memories from store", memories.len());
1257                    memories
1258                }
1259                Err(e) => {
1260                    info!(
1261                        "Failed to load memories for injection: {} (path: {:?})",
1262                        e, memories_path
1263                    );
1264                    Vec::new()
1265                }
1266            };
1267
1268            if memories.is_empty() {
1269                info!("Memory store is empty - no memories to inject");
1270            } else {
1271                let mut memories_content = format_memories_as_markdown(&memories);
1272
1273                if memories_config.budget > 0 {
1274                    let original_len = memories_content.len();
1275                    memories_content =
1276                        truncate_to_budget(&memories_content, memories_config.budget);
1277                    debug!(
1278                        "Applied budget: {} chars -> {} chars (budget: {})",
1279                        original_len,
1280                        memories_content.len(),
1281                        memories_config.budget
1282                    );
1283                }
1284
1285                info!(
1286                    "Injecting {} memories ({} chars) into prompt",
1287                    memories.len(),
1288                    memories_content.len()
1289                );
1290
1291                prefix.push_str(&memories_content);
1292            }
1293        }
1294
1295        // Inject ralph-tools skills conditionally based on config
1296        let tasks_enabled = self.config.tasks.enabled;
1297
1298        // Base skill (shared commands) when either memories or tasks are enabled
1299        if (memories_config.enabled || tasks_enabled)
1300            && let Some(skill) = self.skill_registry.get("ralph-tools")
1301        {
1302            if !prefix.is_empty() {
1303                prefix.push_str("\n\n");
1304            }
1305            prefix.push_str(&format!(
1306                "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1307                skill.content.trim()
1308            ));
1309            debug!("Injected ralph-tools skill from registry");
1310        }
1311
1312        // Tasks skill — only when tasks are enabled
1313        if tasks_enabled && let Some(skill) = self.skill_registry.get("ralph-tools-tasks") {
1314            if !prefix.is_empty() {
1315                prefix.push_str("\n\n");
1316            }
1317            prefix.push_str(&format!(
1318                "<ralph-tools-tasks-skill>\n{}\n</ralph-tools-tasks-skill>",
1319                skill.content.trim()
1320            ));
1321            debug!("Injected ralph-tools-tasks skill from registry");
1322        }
1323
1324        // Memories skill — only when memories are enabled
1325        if memories_config.enabled
1326            && let Some(skill) = self.skill_registry.get("ralph-tools-memories")
1327        {
1328            if !prefix.is_empty() {
1329                prefix.push_str("\n\n");
1330            }
1331            prefix.push_str(&format!(
1332                "<ralph-tools-memories-skill>\n{}\n</ralph-tools-memories-skill>",
1333                skill.content.trim()
1334            ));
1335            debug!("Injected ralph-tools-memories skill from registry");
1336        }
1337    }
1338
1339    /// Injects the RObot interaction skill content into the prefix.
1340    ///
1341    /// Gated by `robot.enabled`. Teaches agents how and when to interact
1342    /// with humans via `human.interact` events.
1343    fn inject_robot_skill(&self, prefix: &mut String) {
1344        if !self.config.robot.enabled {
1345            return;
1346        }
1347
1348        if let Some(skill) = self.skill_registry.get("robot-interaction") {
1349            if !prefix.is_empty() {
1350                prefix.push_str("\n\n");
1351            }
1352            prefix.push_str(&format!(
1353                "<robot-skill>\n{}\n</robot-skill>",
1354                skill.content.trim()
1355            ));
1356            debug!("Injected robot interaction skill from registry");
1357        }
1358    }
1359
1360    /// Injects any user-configured auto-inject skills (excluding built-in skills handled separately).
1361    fn inject_custom_auto_skills(&self, prefix: &mut String) {
1362        for skill in self.skill_registry.auto_inject_skills(None) {
1363            // Skip built-in skills handled above
1364            if matches!(
1365                skill.name.as_str(),
1366                "ralph-tools" | "ralph-tools-tasks" | "ralph-tools-memories" | "robot-interaction"
1367            ) {
1368                continue;
1369            }
1370
1371            if !prefix.is_empty() {
1372                prefix.push_str("\n\n");
1373            }
1374            prefix.push_str(&format!(
1375                "<{name}-skill>\n{content}\n</{name}-skill>",
1376                name = skill.name,
1377                content = skill.content.trim()
1378            ));
1379            debug!("Injected auto-inject skill: {}", skill.name);
1380        }
1381    }
1382
1383    /// Prepends scratchpad content to the prompt if the file exists and is non-empty.
1384    ///
1385    /// The scratchpad is the agent's working memory for the current objective.
1386    /// Auto-injecting saves one tool call per iteration.
1387    /// When the file exceeds the budget, the TAIL is kept (most recent entries).
1388    fn prepend_scratchpad(&self, prompt: String) -> String {
1389        // Skip injection when scratchpad is disabled for the current hat
1390        if !self.ralph.active_scratchpad().enabled {
1391            return prompt;
1392        }
1393
1394        let scratchpad_path = self.scratchpad_path();
1395
1396        let resolved_path = if scratchpad_path.is_relative() {
1397            self.config.core.workspace_root.join(&scratchpad_path)
1398        } else {
1399            scratchpad_path
1400        };
1401
1402        if !resolved_path.exists() {
1403            debug!(
1404                "Scratchpad not found at {:?}, skipping injection",
1405                resolved_path
1406            );
1407            return prompt;
1408        }
1409
1410        let content = match std::fs::read_to_string(&resolved_path) {
1411            Ok(c) => c,
1412            Err(e) => {
1413                info!("Failed to read scratchpad for injection: {}", e);
1414                return prompt;
1415            }
1416        };
1417
1418        if content.trim().is_empty() {
1419            debug!("Scratchpad is empty, skipping injection");
1420            return prompt;
1421        }
1422
1423        // Budget: 4000 tokens ~16000 chars. Keep the TAIL (most recent content).
1424        let char_budget = 4000 * 4;
1425        let content = if content.len() > char_budget {
1426            // Find a line boundary near the start of the tail
1427            let start = content.len() - char_budget;
1428            // Ensure we start at a valid UTF-8 character boundary
1429            let start = floor_char_boundary(&content, start);
1430            let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1431            let discarded = &content[..line_start];
1432
1433            // Summarize discarded content by extracting markdown headings
1434            let headings: Vec<&str> = discarded
1435                .lines()
1436                .filter(|line| line.starts_with('#'))
1437                .collect();
1438            let summary = if headings.is_empty() {
1439                format!(
1440                    "<!-- earlier content truncated ({} chars omitted) -->",
1441                    line_start
1442                )
1443            } else {
1444                format!(
1445                    "<!-- earlier content truncated ({} chars omitted) -->\n\
1446                     <!-- discarded sections: {} -->",
1447                    line_start,
1448                    headings.join(" | ")
1449                )
1450            };
1451
1452            format!("{}\n\n{}", summary, &content[line_start..])
1453        } else {
1454            content
1455        };
1456
1457        info!("Injecting scratchpad ({} chars) into prompt", content.len());
1458
1459        let mut final_prompt = format!(
1460            "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1461            self.ralph.active_scratchpad().path,
1462            content
1463        );
1464        final_prompt.push_str(&prompt);
1465        final_prompt
1466    }
1467
1468    /// Prepends ready tasks to the prompt if tasks are enabled and any exist.
1469    ///
1470    /// Loads the task store and formats ready (unblocked, open) tasks into
1471    /// a `<ready-tasks>` XML block. This saves the agent a tool call per
1472    /// iteration and puts tasks at the same prominence as the scratchpad.
1473    fn prepend_ready_tasks(&self, prompt: String) -> String {
1474        if !self.config.tasks.enabled {
1475            return prompt;
1476        }
1477
1478        use crate::task::TaskStatus;
1479        use crate::task_store::TaskStore;
1480
1481        let tasks_path = self.tasks_path();
1482        let resolved_path = if tasks_path.is_relative() {
1483            self.config.core.workspace_root.join(&tasks_path)
1484        } else {
1485            tasks_path
1486        };
1487
1488        if !resolved_path.exists() {
1489            return prompt;
1490        }
1491
1492        let store = match TaskStore::load(&resolved_path) {
1493            Ok(s) => s,
1494            Err(e) => {
1495                info!("Failed to load task store for injection: {}", e);
1496                return prompt;
1497            }
1498        };
1499
1500        let current_loop_id = self.current_loop_id();
1501
1502        let ready = Self::filter_tasks_by_loop(store.ready(), current_loop_id.as_deref());
1503        let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1504        let all_count =
1505            Self::filter_tasks_by_loop(store.all().iter().collect(), current_loop_id.as_deref())
1506                .len();
1507        let closed_count = all_count - open.len();
1508
1509        if open.is_empty() && closed_count == 0 {
1510            return prompt;
1511        }
1512
1513        let mut section = String::from("<ready-tasks>\n");
1514        if ready.is_empty() && open.is_empty() {
1515            section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1516        } else {
1517            section.push_str(&format!(
1518                "## Tasks: {} ready, {} open, {} closed\n\n",
1519                ready.len(),
1520                open.len(),
1521                closed_count
1522            ));
1523            for task in &ready {
1524                let status_icon = match task.status {
1525                    TaskStatus::Open => "[ ]",
1526                    TaskStatus::InProgress => "[~]",
1527                    _ => "[?]",
1528                };
1529                section.push_str(&format!(
1530                    "- {} [P{}] {} ({}){}\n",
1531                    status_icon,
1532                    task.priority,
1533                    task.title,
1534                    task.id,
1535                    task.key
1536                        .as_deref()
1537                        .map(|key| format!(" — key: {key}"))
1538                        .unwrap_or_default()
1539                ));
1540            }
1541            // Show blocked tasks separately so agent knows they exist
1542            let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1543            let blocked: Vec<_> = open
1544                .iter()
1545                .filter(|t| !ready_ids.contains(&t.id.as_str()))
1546                .collect();
1547            if !blocked.is_empty() {
1548                section.push_str("\nBlocked:\n");
1549                for task in blocked {
1550                    section.push_str(&format!(
1551                        "- [blocked] [P{}] {} ({}){} — blocked by: {}\n",
1552                        task.priority,
1553                        task.title,
1554                        task.id,
1555                        task.key
1556                            .as_deref()
1557                            .map(|key| format!(" — key: {key}"))
1558                            .unwrap_or_default(),
1559                        task.blocked_by.join(", ")
1560                    ));
1561                }
1562            }
1563        }
1564        section.push_str("</ready-tasks>\n\n");
1565
1566        info!(
1567            "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1568            ready.len(),
1569            open.len(),
1570            closed_count
1571        );
1572
1573        let mut final_prompt = section;
1574        final_prompt.push_str(&prompt);
1575        final_prompt
1576    }
1577
1578    /// Builds the Ralph prompt (coordination mode).
1579    pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1580        self.ralph.build_prompt(prompt_content, &[])
1581    }
1582
1583    /// Determines which hats should be active based on pending events.
1584    /// Returns list of Hat references that are triggered by any pending event.
1585    fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1586        let mut active_hats = Vec::new();
1587        for id in self.determine_active_hat_ids(events) {
1588            if let Some(hat) = self.registry.get(&id) {
1589                active_hats.push(hat);
1590            }
1591        }
1592        active_hats
1593    }
1594
1595    fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1596        let mut entrypoint_hat_ids = Vec::new();
1597        let mut progressed_hat_ids = Vec::new();
1598        for event in events {
1599            // Prefer direct event target over topic-based lookup
1600            let hat_id = if let Some(target) = &event.target
1601                && self.registry.get(target).is_some()
1602            {
1603                target.clone()
1604            } else if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1605                hat.id.clone()
1606            } else {
1607                continue;
1608            };
1609
1610            let list = if self.is_entrypoint_topic(event.topic.as_str()) {
1611                &mut entrypoint_hat_ids
1612            } else {
1613                &mut progressed_hat_ids
1614            };
1615            if !list.iter().any(|id| id == &hat_id) {
1616                list.push(hat_id);
1617            }
1618        }
1619        // Prefer progressed hats over entrypoint hats. Entrypoint events
1620        // (starting_event, task.start, task.resume) linger in the bus after
1621        // the first hat runs. Including them would re-activate the first hat
1622        // alongside downstream hats, confusing the agent with multiple hat
1623        // instructions when only the downstream hat should run.
1624        if progressed_hat_ids.is_empty() {
1625            entrypoint_hat_ids
1626        } else {
1627            progressed_hat_ids
1628        }
1629    }
1630
1631    fn effective_regular_events<'a>(&self, events: &'a [Event]) -> Vec<&'a Event> {
1632        let has_downstream_event = events
1633            .iter()
1634            .any(|event| !Self::is_kickoff_or_recovery_event(event.topic.as_str()));
1635        events
1636            .iter()
1637            .filter(|event| {
1638                !has_downstream_event || !Self::is_kickoff_or_recovery_event(event.topic.as_str())
1639            })
1640            .collect()
1641    }
1642
1643    fn is_kickoff_or_recovery_event(topic: &str) -> bool {
1644        topic == "task.start" || topic == "task.resume" || topic.strip_suffix(".start").is_some()
1645    }
1646
1647    fn is_entrypoint_topic(&self, topic: &str) -> bool {
1648        topic == "task.start"
1649            || topic == "task.resume"
1650            || topic.strip_suffix(".start").is_some()
1651            || self.config.event_loop.starting_event.as_deref() == Some(topic)
1652    }
1653
1654    fn peek_pending_regular_events(&self) -> Vec<Event> {
1655        let mut events = Vec::new();
1656        for hat_id in self.bus.hat_ids() {
1657            if let Some(pending) = self.bus.peek_pending(hat_id) {
1658                events.extend(pending.iter().cloned());
1659            }
1660        }
1661        events
1662    }
1663
1664    /// Formats an event for prompt context.
1665    ///
1666    /// For top-level prompts (task.start, task.resume), wraps the payload in
1667    /// `<top-level-prompt>` XML tags to clearly delineate the user's original request.
1668    fn format_event(event: &Event) -> String {
1669        let topic = &event.topic;
1670        let payload = &event.payload;
1671
1672        if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1673            format!(
1674                "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1675                topic, payload
1676            )
1677        } else {
1678            format!("Event: {} - {}", topic, payload)
1679        }
1680    }
1681
1682    fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1683        let Some(config) = self.registry.get_config(hat_id) else {
1684            return (false, None);
1685        };
1686        let Some(max) = config.max_activations else {
1687            return (false, None);
1688        };
1689
1690        let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1691        if count < max {
1692            return (false, None);
1693        }
1694
1695        // Emit only once per hat per run (avoid flooding).
1696        let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1697
1698        if !should_emit {
1699            // Hat is already exhausted - drop pending events silently.
1700            return (true, None);
1701        }
1702
1703        let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1704        dropped_topics.sort();
1705
1706        let payload = format!(
1707            "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n  - {topics}",
1708            hat = hat_id.as_str(),
1709            max = max,
1710            count = count,
1711            topics = dropped_topics.join("\n  - ")
1712        );
1713
1714        warn!(
1715            hat = %hat_id.as_str(),
1716            max_activations = max,
1717            activations = count,
1718            "Hat exhausted (max_activations reached)"
1719        );
1720
1721        (
1722            true,
1723            Some(Event::new(
1724                format!("{}.exhausted", hat_id.as_str()),
1725                payload,
1726            )),
1727        )
1728    }
1729
1730    fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1731        for hat_id in active_hat_ids {
1732            *self
1733                .state
1734                .hat_activation_counts
1735                .entry(hat_id.clone())
1736                .or_insert(0) += 1;
1737        }
1738    }
1739
1740    /// Returns the primary active hat ID for display purposes.
1741    /// Returns the first active hat, or "ralph" if no specific hat is active.
1742    /// BTreeMap iteration is already sorted by key.
1743    pub fn get_active_hat_id(&self) -> HatId {
1744        let pending_events = self.peek_pending_regular_events();
1745        if let Some(active_hat_id) = self
1746            .determine_active_hat_ids(&pending_events)
1747            .into_iter()
1748            .next()
1749        {
1750            return active_hat_id;
1751        }
1752        HatId::new("ralph")
1753    }
1754
1755    /// Injects a default event for a hat when the agent wrote no events.
1756    ///
1757    /// Call this after `process_events_from_jsonl` returns `Ok(false)` (no events found).
1758    /// If the hat has `default_publishes` configured, this injects the default event.
1759    ///
1760    /// If the default topic matches the completion promise, `completion_requested` is set
1761    /// so the loop can terminate. Without this, completion events injected via
1762    /// `default_publishes` would only be published to the bus (triggering downstream hats)
1763    /// but never detected by `check_completion_event`, causing an infinite loop.
1764    pub fn check_default_publishes(&mut self, hat_id: &HatId) {
1765        if let Some(config) = self.registry.get_config(hat_id)
1766            && let Some(default_topic) = &config.default_publishes
1767        {
1768            let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1769
1770            debug!(
1771                hat = %hat_id.as_str(),
1772                topic = %default_topic,
1773                "No events written by hat, injecting default_publishes event"
1774            );
1775
1776            self.state.record_event(&default_event);
1777
1778            // If the default topic is the completion promise, set the flag directly.
1779            // The normal path (process_events_from_jsonl) sets this when reading from
1780            // JSONL, but default_publishes bypasses JSONL entirely.
1781            if default_topic.as_str() == self.config.event_loop.completion_promise {
1782                info!(
1783                    hat = %hat_id.as_str(),
1784                    topic = %default_topic,
1785                    "default_publishes matches completion_promise — requesting termination"
1786                );
1787                self.state.completion_requested = true;
1788            }
1789
1790            self.bus.publish(default_event);
1791        }
1792    }
1793
1794    /// Returns a mutable reference to the event bus for direct event publishing.
1795    ///
1796    /// This is primarily used for planning sessions to inject user responses
1797    /// as events into the orchestration loop.
1798    pub fn bus(&mut self) -> &mut EventBus {
1799        &mut self.bus
1800    }
1801
1802    /// Processes output from a hat execution.
1803    ///
1804    /// Returns the termination reason if the loop should stop.
1805    pub fn process_output(
1806        &mut self,
1807        hat_id: &HatId,
1808        output: &str,
1809        success: bool,
1810    ) -> Option<TerminationReason> {
1811        self.state.iteration += 1;
1812        self.state.last_hat = Some(hat_id.clone());
1813
1814        // Periodic robot check-in
1815        if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1816            && let Some(ref robot_service) = self.robot_service
1817        {
1818            let elapsed = self.state.elapsed();
1819            let interval = std::time::Duration::from_secs(interval_secs);
1820            let last = self
1821                .state
1822                .last_checkin_at
1823                .map(|t| t.elapsed())
1824                .unwrap_or(elapsed);
1825
1826            if last >= interval {
1827                let context = self.build_checkin_context(hat_id);
1828                match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1829                    Ok(_) => {
1830                        self.state.last_checkin_at = Some(std::time::Instant::now());
1831                        debug!(iteration = self.state.iteration, "Sent robot check-in");
1832                    }
1833                    Err(e) => {
1834                        warn!(error = %e, "Failed to send robot check-in");
1835                    }
1836                }
1837            }
1838        }
1839
1840        // Log iteration started
1841        self.diagnostics.log_orchestration(
1842            self.state.iteration,
1843            "loop",
1844            crate::diagnostics::OrchestrationEvent::IterationStarted,
1845        );
1846
1847        // Log hat selected
1848        self.diagnostics.log_orchestration(
1849            self.state.iteration,
1850            "loop",
1851            crate::diagnostics::OrchestrationEvent::HatSelected {
1852                hat: hat_id.to_string(),
1853                reason: "process_output".to_string(),
1854            },
1855        );
1856
1857        // Track failures
1858        if success {
1859            self.state.consecutive_failures = 0;
1860        } else {
1861            self.state.consecutive_failures += 1;
1862        }
1863
1864        let _ = output;
1865
1866        // File-modification audit: detect when a hat with disallowed Edit/Write tools
1867        // modified files. This is hard enforcement — emits a scope_violation event.
1868        self.audit_file_modifications(hat_id);
1869
1870        // Events are ONLY read from the JSONL file written by `ralph emit`.
1871        // This enforces tool use and prevents confabulation (agent claiming to emit without actually doing so).
1872        // See process_events_from_jsonl() for event processing.
1873
1874        // Check termination conditions
1875        self.check_termination()
1876    }
1877
1878    /// Audits file modifications after a hat iteration.
1879    ///
1880    /// If the hat has `Edit` or `Write` in its `disallowed_tools`, checks whether
1881    /// files were modified (via `git diff --stat HEAD`). If so, emits a
1882    /// `<hat_id>.scope_violation` event.
1883    fn audit_file_modifications(&mut self, hat_id: &HatId) {
1884        let config = match self.registry.get_config(hat_id) {
1885            Some(c) => c,
1886            None => return,
1887        };
1888
1889        let has_write_restriction = config
1890            .disallowed_tools
1891            .iter()
1892            .any(|t| t == "Edit" || t == "Write");
1893
1894        if !has_write_restriction {
1895            return;
1896        }
1897
1898        let workspace = &self.config.core.workspace_root;
1899        let diff_output = std::process::Command::new("git")
1900            .args(["diff", "--stat", "HEAD"])
1901            .current_dir(workspace)
1902            .output();
1903
1904        match diff_output {
1905            Ok(output) if !output.stdout.is_empty() => {
1906                let diff_stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
1907                warn!(
1908                    hat = %hat_id.as_str(),
1909                    diff = %diff_stat,
1910                    "Hat modified files despite tool restrictions (scope violation)"
1911                );
1912
1913                let violation_topic = format!("{}.scope_violation", hat_id.as_str());
1914                let violation = Event::new(
1915                    violation_topic.as_str(),
1916                    format!(
1917                        "Hat '{}' modified files with Edit/Write disallowed:\n{}",
1918                        hat_id.as_str(),
1919                        diff_stat
1920                    ),
1921                );
1922                self.bus.publish(violation);
1923            }
1924            Err(e) => {
1925                debug!(error = %e, "Could not run git diff for file-modification audit");
1926            }
1927            _ => {} // No modifications — all good
1928        }
1929    }
1930
1931    /// Extracts task identifier from build.blocked payload.
1932    /// Uses first line of payload as task ID.
1933    fn extract_task_id(payload: &str) -> String {
1934        payload
1935            .lines()
1936            .next()
1937            .unwrap_or("unknown")
1938            .trim()
1939            .to_string()
1940    }
1941
1942    /// Adds cost to the cumulative total.
1943    pub fn add_cost(&mut self, cost: f64) {
1944        self.state.cumulative_cost += cost;
1945    }
1946
1947    /// Verifies all tasks in scratchpad are complete or cancelled.
1948    ///
1949    /// Returns:
1950    /// - `Ok(true)` if all tasks are `[x]` or `[~]`, or if scratchpad is disabled
1951    /// - `Ok(false)` if any tasks are `[ ]` (pending)
1952    /// - `Err(...)` if scratchpad doesn't exist or can't be read
1953    fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1954        // Nothing to verify when scratchpad is disabled
1955        if !self.ralph.active_scratchpad().enabled {
1956            return Ok(true);
1957        }
1958
1959        let scratchpad_path = self.scratchpad_path();
1960
1961        if !scratchpad_path.exists() {
1962            return Err(std::io::Error::new(
1963                std::io::ErrorKind::NotFound,
1964                "Scratchpad does not exist",
1965            ));
1966        }
1967
1968        let content = std::fs::read_to_string(scratchpad_path)?;
1969
1970        let has_pending = content
1971            .lines()
1972            .any(|line| line.trim_start().starts_with("- [ ]"));
1973
1974        Ok(!has_pending)
1975    }
1976
1977    /// Reads the current loop ID from the marker file.
1978    ///
1979    /// Returns `None` if no marker exists or is empty, which means
1980    /// task queries should be unfiltered (backwards compatible).
1981    fn current_loop_id(&self) -> Option<String> {
1982        self.loop_context
1983            .as_ref()
1984            .and_then(|ctx| {
1985                let marker_path = ctx.ralph_dir().join("current-loop-id");
1986                std::fs::read_to_string(&marker_path).ok()
1987            })
1988            .map(|id| id.trim().to_string())
1989            .filter(|id| !id.is_empty())
1990    }
1991
1992    /// Filters a task list by loop ID. When `loop_id` is `None`, returns all tasks.
1993    fn filter_tasks_by_loop<'a>(
1994        tasks: Vec<&'a crate::task::Task>,
1995        loop_id: Option<&str>,
1996    ) -> Vec<&'a crate::task::Task> {
1997        match loop_id {
1998            Some(id) => tasks
1999                .into_iter()
2000                .filter(|t| t.loop_id.as_deref() == Some(id))
2001                .collect(),
2002            None => tasks,
2003        }
2004    }
2005
2006    fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
2007        use crate::task_store::TaskStore;
2008
2009        let tasks_path = self.tasks_path();
2010
2011        // No tasks file = no pending tasks = complete
2012        if !tasks_path.exists() {
2013            return Ok(true);
2014        }
2015
2016        let store = TaskStore::load(&tasks_path)?;
2017        let current_loop_id = self.current_loop_id();
2018        let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
2019        Ok(open.is_empty())
2020    }
2021
2022    /// Builds a [`CheckinContext`] with current loop state for robot check-ins.
2023    fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
2024        let (open_tasks, closed_tasks) = self.count_tasks();
2025        CheckinContext {
2026            current_hat: Some(hat_id.as_str().to_string()),
2027            open_tasks,
2028            closed_tasks,
2029            cumulative_cost: self.state.cumulative_cost,
2030        }
2031    }
2032
2033    /// Counts open and closed tasks from the task store.
2034    ///
2035    /// Returns `(open_count, closed_count)`. "Open" means non-terminal tasks,
2036    /// "closed" means tasks with `TaskStatus::Closed`.
2037    fn count_tasks(&self) -> (usize, usize) {
2038        use crate::task_store::TaskStore;
2039
2040        let tasks_path = self.tasks_path();
2041        if !tasks_path.exists() {
2042            return (0, 0);
2043        }
2044
2045        match TaskStore::load(&tasks_path) {
2046            Ok(store) => {
2047                let current_loop_id = self.current_loop_id();
2048                let all = Self::filter_tasks_by_loop(
2049                    store.all().iter().collect(),
2050                    current_loop_id.as_deref(),
2051                );
2052                let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
2053                let closed = all.len() - open.len();
2054                (open.len(), closed)
2055            }
2056            Err(_) => (0, 0),
2057        }
2058    }
2059
2060    /// Returns a list of open task descriptions for logging purposes.
2061    fn get_open_task_list(&self) -> Vec<String> {
2062        use crate::task_store::TaskStore;
2063
2064        let tasks_path = self.tasks_path();
2065        if let Ok(store) = TaskStore::load(&tasks_path) {
2066            let current_loop_id = self.current_loop_id();
2067            let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
2068            return open
2069                .iter()
2070                .map(|t| format!("{}: {}", t.id, t.title))
2071                .collect();
2072        }
2073        vec![]
2074    }
2075
2076    fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
2077        let threshold = self.config.event_loop.mutation_score_warn_threshold;
2078
2079        match &evidence.mutants {
2080            Some(mutants) => {
2081                if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
2082                    warn!(
2083                        reason = %reason,
2084                        mutants_status = ?mutants.status,
2085                        mutants_score = mutants.score_percent,
2086                        mutants_threshold = threshold,
2087                        "Mutation testing warning"
2088                    );
2089                }
2090            }
2091            None => {
2092                if let Some(threshold) = threshold {
2093                    warn!(
2094                        mutants_threshold = threshold,
2095                        "Mutation testing warning: missing mutation evidence in build.done payload"
2096                    );
2097                }
2098            }
2099        }
2100    }
2101
2102    fn mutation_warning_reason(
2103        mutants: &MutationEvidence,
2104        threshold: Option<f64>,
2105    ) -> Option<String> {
2106        match mutants.status {
2107            MutationStatus::Fail => Some("mutation testing failed".to_string()),
2108            MutationStatus::Warn => Some(Self::format_mutation_message(
2109                "mutation score below threshold",
2110                mutants.score_percent,
2111            )),
2112            MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
2113            MutationStatus::Pass => {
2114                let threshold = threshold?;
2115
2116                match mutants.score_percent {
2117                    Some(score) if score < threshold => Some(format!(
2118                        "mutation score {:.2}% below threshold {:.2}%",
2119                        score, threshold
2120                    )),
2121                    Some(_) => None,
2122                    None => Some(format!(
2123                        "mutation score missing (threshold {:.2}%)",
2124                        threshold
2125                    )),
2126                }
2127            }
2128        }
2129    }
2130
2131    fn format_mutation_message(message: &str, score: Option<f64>) -> String {
2132        match score {
2133            Some(score) => format!("{message} ({score:.2}%)"),
2134            None => message.to_string(),
2135        }
2136    }
2137
2138    fn parse_human_interact_context(payload: &str) -> Value {
2139        let mut context = match serde_json::from_str::<Value>(payload) {
2140            Ok(Value::Object(map)) => map,
2141            Ok(value) => {
2142                let mut map = Map::new();
2143                map.insert("question".to_string(), value);
2144                map
2145            }
2146            Err(_) => {
2147                let mut map = Map::new();
2148                map.insert("question".to_string(), Value::String(payload.to_string()));
2149                map
2150            }
2151        };
2152
2153        if !context.contains_key("question") {
2154            context.insert("question".to_string(), Value::String(payload.to_string()));
2155        }
2156
2157        Value::Object(context)
2158    }
2159
2160    fn is_restart_request_payload(payload: &str) -> bool {
2161        let payload = payload.to_ascii_lowercase();
2162        payload.contains("restart yourself") || payload.contains("restart ralph")
2163    }
2164
2165    fn is_restart_request_event(event: &Event) -> bool {
2166        matches!(event.topic.as_str(), "human.response" | "user.prompt")
2167            && Self::is_restart_request_payload(&event.payload)
2168    }
2169
2170    fn mark_restart_requested(&self, source: &str) {
2171        let restart_path =
2172            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
2173
2174        if let Some(parent) = restart_path.parent()
2175            && let Err(err) = std::fs::create_dir_all(parent)
2176        {
2177            warn!(
2178                error = %err,
2179                path = %parent.display(),
2180                "Failed to create restart-requested parent directory"
2181            );
2182            return;
2183        }
2184
2185        if let Err(err) = std::fs::write(&restart_path, source) {
2186            warn!(
2187                error = %err,
2188                path = %restart_path.display(),
2189                "Failed to write restart-requested signal"
2190            );
2191            return;
2192        }
2193
2194        info!(
2195            source,
2196            path = %restart_path.display(),
2197            "Restart requested from human text"
2198        );
2199    }
2200
2201    /// Processes events from JSONL and routes orphaned events to Ralph.
2202    ///
2203    /// Also handles backpressure for malformed JSONL lines by:
2204    /// 1. Emitting `event.malformed` system events for each parse failure
2205    /// 2. Tracking consecutive failures for termination check
2206    /// 3. Resetting counter when valid events are parsed
2207    ///
2208    /// Returns [`ProcessedEvents`] indicating whether events were found, whether
2209    /// semantic `plan.*` topics were published, structured `human.interact`
2210    /// context/outcome metadata, and whether any were orphans that Ralph should
2211    /// handle.
2212    pub fn process_events_from_jsonl(&mut self) -> std::io::Result<ProcessedEvents> {
2213        let result = self.event_reader.read_new_events()?;
2214        self.process_parse_result(result)
2215    }
2216
2217    /// Inner event processing that operates on an already-parsed `ParseResult`.
2218    ///
2219    /// This is the single source of truth for event validation, backpressure,
2220    /// scope enforcement, and bus publishing. Both `process_events_from_jsonl`
2221    /// and `process_events_from_jsonl_with_waves` delegate to this method.
2222    fn process_parse_result(
2223        &mut self,
2224        result: crate::event_reader::ParseResult,
2225    ) -> std::io::Result<ProcessedEvents> {
2226        // Handle malformed lines with backpressure
2227        for malformed in &result.malformed {
2228            let payload = format!(
2229                "Line {}: {}\nContent: {}",
2230                malformed.line_number, malformed.error, &malformed.content
2231            );
2232            let event = Event::new("event.malformed", &payload);
2233            self.bus.publish(event);
2234            self.state.consecutive_malformed_events += 1;
2235            warn!(
2236                line = malformed.line_number,
2237                consecutive = self.state.consecutive_malformed_events,
2238                "Malformed event line detected"
2239            );
2240        }
2241
2242        // Reset counter when valid events are parsed
2243        if !result.events.is_empty() {
2244            self.state.consecutive_malformed_events = 0;
2245        }
2246
2247        if result.events.is_empty() && result.malformed.is_empty() {
2248            return Ok(ProcessedEvents {
2249                had_events: false,
2250                had_plan_events: false,
2251                human_interact_context: None,
2252                has_orphans: false,
2253            });
2254        }
2255
2256        // --- Scope enforcement: filter events against active hat's publishes ---
2257        // Only active when enforce_hat_scope is true in config (opt-in).
2258        let events = if self.config.event_loop.enforce_hat_scope {
2259            let active_hats = self.state.last_active_hat_ids.clone();
2260            let (in_scope, out_of_scope): (Vec<_>, Vec<_>) =
2261                result.events.into_iter().partition(|event| {
2262                    if active_hats.is_empty() {
2263                        return true; // Ralph coordinating — no scope restriction
2264                    }
2265                    active_hats
2266                        .iter()
2267                        .any(|hat_id| self.registry.can_publish(hat_id, event.topic.as_str()))
2268                });
2269
2270            for event in &out_of_scope {
2271                let violation_hat = active_hats.first().map(|h| h.as_str()).unwrap_or("unknown");
2272                warn!(
2273                    active_hats = ?active_hats,
2274                    topic = %event.topic,
2275                    "Scope violation: active hat(s) cannot publish this topic — dropping event"
2276                );
2277                let violation_topic = format!("{}.scope_violation", violation_hat);
2278                let violation_payload = format!(
2279                    "Attempted to publish '{}': {}",
2280                    event.topic,
2281                    event.payload.clone().unwrap_or_default()
2282                );
2283                let violation = Event::new(violation_topic, violation_payload);
2284                self.bus.publish(violation);
2285            }
2286
2287            in_scope
2288        } else {
2289            result.events
2290        };
2291        // --- End scope enforcement ---
2292
2293        let mut has_orphans = false;
2294
2295        // Validate and transform events (apply backpressure for build.done)
2296        let mut validated_events = Vec::new();
2297        let completion_topic = self.config.event_loop.completion_promise.as_str();
2298        let cancellation_topic = self.config.event_loop.cancellation_promise.clone();
2299        let total_events = events.len();
2300        for (index, event) in events.into_iter().enumerate() {
2301            let payload = event.payload.clone().unwrap_or_default();
2302
2303            // Detect loop.cancel — unconditional graceful termination
2304            if !cancellation_topic.is_empty() && event.topic.as_str() == cancellation_topic {
2305                info!(
2306                    payload = %payload,
2307                    "loop.cancel event detected — scheduling graceful termination"
2308                );
2309                self.state.cancellation_requested = true;
2310                // Continue processing remaining events (they may contain cleanup info)
2311                continue;
2312            }
2313
2314            if event.topic == completion_topic {
2315                if index + 1 == total_events {
2316                    self.state.completion_requested = true;
2317                    self.diagnostics.log_orchestration(
2318                        self.state.iteration,
2319                        "jsonl",
2320                        crate::diagnostics::OrchestrationEvent::EventPublished {
2321                            topic: event.topic.clone(),
2322                        },
2323                    );
2324                    info!(
2325                        topic = %event.topic,
2326                        "Completion event detected in JSONL"
2327                    );
2328                } else {
2329                    warn!(
2330                        topic = %event.topic,
2331                        index = index,
2332                        total_events = total_events,
2333                        "Completion event ignored because it was not the last event"
2334                    );
2335                }
2336                continue;
2337            }
2338
2339            if event.topic == "build.done" {
2340                // Validate build.done events have backpressure evidence
2341                if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
2342                    if evidence.all_passed() {
2343                        self.warn_on_mutation_evidence(&evidence);
2344                        validated_events.push(Event::new(event.topic.as_str(), &payload));
2345                    } else {
2346                        // Evidence present but checks failed - synthesize build.blocked
2347                        warn!(
2348                            tests = evidence.tests_passed,
2349                            lint = evidence.lint_passed,
2350                            typecheck = evidence.typecheck_passed,
2351                            audit = evidence.audit_passed,
2352                            coverage = evidence.coverage_passed,
2353                            complexity = evidence.complexity_score,
2354                            duplication = evidence.duplication_passed,
2355                            performance = evidence.performance_regression,
2356                            specs = evidence.specs_verified,
2357                            "build.done rejected: backpressure checks failed"
2358                        );
2359
2360                        let complexity = evidence
2361                            .complexity_score
2362                            .map(|value| format!("{value:.2}"))
2363                            .unwrap_or_else(|| "missing".to_string());
2364                        let performance = match evidence.performance_regression {
2365                            Some(true) => "regression".to_string(),
2366                            Some(false) => "pass".to_string(),
2367                            None => "missing".to_string(),
2368                        };
2369                        let specs = match evidence.specs_verified {
2370                            Some(true) => "pass".to_string(),
2371                            Some(false) => "fail".to_string(),
2372                            None => "not reported".to_string(),
2373                        };
2374
2375                        self.diagnostics.log_orchestration(
2376                            self.state.iteration,
2377                            "jsonl",
2378                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2379                                reason: format!(
2380                                    "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
2381                                    evidence.tests_passed,
2382                                    evidence.lint_passed,
2383                                    evidence.typecheck_passed,
2384                                    evidence.audit_passed,
2385                                    evidence.coverage_passed,
2386                                    complexity,
2387                                    evidence.duplication_passed,
2388                                    performance,
2389                                    specs
2390                                ),
2391                            },
2392                        );
2393
2394                        validated_events.push(Event::new(
2395                            "build.blocked",
2396                            "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
2397                        ));
2398                    }
2399                } else {
2400                    // No evidence found - synthesize build.blocked
2401                    warn!("build.done rejected: missing backpressure evidence");
2402
2403                    self.diagnostics.log_orchestration(
2404                        self.state.iteration,
2405                        "jsonl",
2406                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2407                            reason: "missing backpressure evidence".to_string(),
2408                        },
2409                    );
2410
2411                    validated_events.push(Event::new(
2412                        "build.blocked",
2413                        "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
2414                    ));
2415                }
2416            } else if event.topic == "review.done" && !event.is_wave_event() {
2417                // Validate review.done events have verification evidence.
2418                // Wave worker events skip this — wave reviews are read-only
2419                // and don't run tests/builds.
2420                if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
2421                    if evidence.is_verified() {
2422                        validated_events.push(Event::new(event.topic.as_str(), &payload));
2423                    } else {
2424                        // Evidence present but checks failed - synthesize review.blocked
2425                        warn!(
2426                            tests = evidence.tests_passed,
2427                            build = evidence.build_passed,
2428                            "review.done rejected: verification checks failed"
2429                        );
2430
2431                        self.diagnostics.log_orchestration(
2432                            self.state.iteration,
2433                            "jsonl",
2434                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2435                                reason: format!(
2436                                    "review verification failed: tests={}, build={}",
2437                                    evidence.tests_passed, evidence.build_passed
2438                                ),
2439                            },
2440                        );
2441
2442                        validated_events.push(Event::new(
2443                            "review.blocked",
2444                            "Review verification failed. Run tests and build before emitting review.done.",
2445                        ));
2446                    }
2447                } else {
2448                    // No evidence found - synthesize review.blocked
2449                    warn!("review.done rejected: missing verification evidence");
2450
2451                    self.diagnostics.log_orchestration(
2452                        self.state.iteration,
2453                        "jsonl",
2454                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2455                            reason: "missing review verification evidence".to_string(),
2456                        },
2457                    );
2458
2459                    validated_events.push(Event::new(
2460                        "review.blocked",
2461                        "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
2462                    ));
2463                }
2464            } else if event.topic == "verify.passed" {
2465                if let Some(report) = EventParser::parse_quality_report(&payload) {
2466                    if report.meets_thresholds() {
2467                        validated_events.push(Event::new(event.topic.as_str(), &payload));
2468                    } else {
2469                        let failed = report.failed_dimensions();
2470                        let reason = if failed.is_empty() {
2471                            "quality thresholds failed".to_string()
2472                        } else {
2473                            format!("quality thresholds failed: {}", failed.join(", "))
2474                        };
2475
2476                        warn!(
2477                            failed_dimensions = ?failed,
2478                            "verify.passed rejected: quality thresholds failed"
2479                        );
2480
2481                        self.diagnostics.log_orchestration(
2482                            self.state.iteration,
2483                            "jsonl",
2484                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2485                                reason,
2486                            },
2487                        );
2488
2489                        validated_events.push(Event::new(
2490                            "verify.failed",
2491                            "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
2492                        ));
2493                    }
2494                } else {
2495                    // No quality report found - synthesize verify.failed
2496                    warn!("verify.passed rejected: missing quality report");
2497
2498                    self.diagnostics.log_orchestration(
2499                        self.state.iteration,
2500                        "jsonl",
2501                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2502                            reason: "missing quality report".to_string(),
2503                        },
2504                    );
2505
2506                    validated_events.push(Event::new(
2507                        "verify.failed",
2508                        "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
2509                    ));
2510                }
2511            } else if event.topic == "verify.failed" {
2512                if EventParser::parse_quality_report(&payload).is_none() {
2513                    warn!("verify.failed missing quality report");
2514                }
2515                validated_events.push(Event::new(event.topic.as_str(), &payload));
2516            } else {
2517                // Non-backpressure events pass through unchanged
2518                validated_events.push(Event::new(event.topic.as_str(), &payload));
2519            }
2520        }
2521
2522        // Track build.blocked events for thrashing detection
2523        let blocked_events: Vec<_> = validated_events
2524            .iter()
2525            .filter(|e| e.topic == "build.blocked".into())
2526            .collect();
2527
2528        for blocked_event in &blocked_events {
2529            let task_id = Self::extract_task_id(&blocked_event.payload);
2530
2531            let count = self
2532                .state
2533                .task_block_counts
2534                .entry(task_id.clone())
2535                .or_insert(0);
2536            *count += 1;
2537
2538            debug!(
2539                task_id = %task_id,
2540                block_count = *count,
2541                "Task blocked"
2542            );
2543
2544            // After 3 blocks on same task, emit build.task.abandoned
2545            if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
2546                warn!(
2547                    task_id = %task_id,
2548                    "Task abandoned after 3 consecutive blocks"
2549                );
2550
2551                self.state.abandoned_tasks.push(task_id.clone());
2552
2553                self.diagnostics.log_orchestration(
2554                    self.state.iteration,
2555                    "jsonl",
2556                    crate::diagnostics::OrchestrationEvent::TaskAbandoned {
2557                        reason: format!(
2558                            "3 consecutive build.blocked events for task '{}'",
2559                            task_id
2560                        ),
2561                    },
2562                );
2563
2564                let abandoned_event = Event::new(
2565                    "build.task.abandoned",
2566                    format!(
2567                        "Task '{}' abandoned after 3 consecutive build.blocked events",
2568                        task_id
2569                    ),
2570                );
2571
2572                self.bus.publish(abandoned_event);
2573            }
2574        }
2575
2576        // Track hat-level blocking for legacy thrashing detection
2577        let has_blocked_event = !blocked_events.is_empty();
2578
2579        if has_blocked_event {
2580            self.state.consecutive_blocked += 1;
2581        } else {
2582            self.state.consecutive_blocked = 0;
2583            self.state.last_blocked_hat = None;
2584        }
2585
2586        // Handle human.interact blocking behavior:
2587        // When a human.interact event is detected and robot service is active,
2588        // send the question and block until human.response or timeout.
2589        let mut response_event = None;
2590        let mut human_interact_context = None;
2591        let ask_human_idx = validated_events
2592            .iter()
2593            .position(|e| e.topic == "human.interact".into());
2594
2595        if let Some(idx) = ask_human_idx {
2596            let ask_event = &validated_events[idx];
2597            let payload = ask_event.payload.clone();
2598
2599            let mut context = match Self::parse_human_interact_context(&payload) {
2600                Value::Object(map) => map,
2601                _ => Map::new(),
2602            };
2603
2604            if let Some(ref robot_service) = self.robot_service {
2605                info!(
2606                    payload = %payload,
2607                    "human.interact event detected — sending question via robot service"
2608                );
2609
2610                // Send the question (includes retry with exponential backoff)
2611                let send_ok = match robot_service.send_question(&payload) {
2612                    Ok(_message_id) => true,
2613                    Err(e) => {
2614                        warn!(
2615                            error = %e,
2616                            "Failed to send human.interact question after retries — treating as timeout"
2617                        );
2618                        // Log to diagnostics
2619                        self.diagnostics.log_error(
2620                            self.state.iteration,
2621                            "telegram",
2622                            crate::diagnostics::DiagnosticError::TelegramSendError {
2623                                operation: "send_question".to_string(),
2624                                error: e.to_string(),
2625                                retry_count: 3,
2626                            },
2627                        );
2628                        context.insert(
2629                            "outcome".to_string(),
2630                            Value::String("send_failure".to_string()),
2631                        );
2632                        context.insert("error".to_string(), Value::String(e.to_string()));
2633                        false
2634                    }
2635                };
2636
2637                // Block: poll events file for human.response
2638                // Per spec, even on send failure we treat as timeout (continue without blocking)
2639                if send_ok {
2640                    // Read the active events path from the current-events marker,
2641                    // falling back to the default events.jsonl if not available.
2642                    let events_path = self
2643                        .loop_context
2644                        .as_ref()
2645                        .and_then(|ctx| {
2646                            std::fs::read_to_string(ctx.current_events_marker())
2647                                .ok()
2648                                .map(|s| ctx.workspace().join(s.trim()))
2649                        })
2650                        .or_else(|| {
2651                            std::fs::read_to_string(".ralph/current-events")
2652                                .ok()
2653                                .map(|s| PathBuf::from(s.trim()))
2654                        })
2655                        .unwrap_or_else(|| {
2656                            self.loop_context
2657                                .as_ref()
2658                                .map(|ctx| ctx.events_path())
2659                                .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2660                        });
2661
2662                    match robot_service.wait_for_response(&events_path) {
2663                        Ok(Some(response)) => {
2664                            info!(
2665                                response = %response,
2666                                "Received human.response — continuing loop"
2667                            );
2668                            context.insert(
2669                                "outcome".to_string(),
2670                                Value::String("response".to_string()),
2671                            );
2672                            context.insert("response".to_string(), Value::String(response.clone()));
2673                            // Create a human.response event to inject into the bus
2674                            response_event = Some(Event::new("human.response", &response));
2675                        }
2676                        Ok(None) => {
2677                            warn!(
2678                                timeout_secs = robot_service.timeout_secs(),
2679                                "Human response timeout — injecting human.timeout event"
2680                            );
2681                            context.insert(
2682                                "outcome".to_string(),
2683                                Value::String("timeout".to_string()),
2684                            );
2685                            context.insert(
2686                                "timeout_seconds".to_string(),
2687                                Value::from(robot_service.timeout_secs()),
2688                            );
2689                            let timeout_event = Event::new(
2690                                "human.timeout",
2691                                format!(
2692                                    "No response after {}s. Original question: {}",
2693                                    robot_service.timeout_secs(),
2694                                    payload
2695                                ),
2696                            );
2697                            response_event = Some(timeout_event);
2698                        }
2699                        Err(e) => {
2700                            warn!(
2701                                error = %e,
2702                                "Error waiting for human response — injecting human.timeout event"
2703                            );
2704                            context.insert(
2705                                "outcome".to_string(),
2706                                Value::String("wait_error".to_string()),
2707                            );
2708                            context.insert("error".to_string(), Value::String(e.to_string()));
2709                            let timeout_event = Event::new(
2710                                "human.timeout",
2711                                format!(
2712                                    "Error waiting for response: {}. Original question: {}",
2713                                    e, payload
2714                                ),
2715                            );
2716                            response_event = Some(timeout_event);
2717                        }
2718                    }
2719                }
2720            } else {
2721                debug!(
2722                    "human.interact event detected but no robot service active — passing through"
2723                );
2724                context.insert(
2725                    "outcome".to_string(),
2726                    Value::String("no_robot_service".to_string()),
2727                );
2728            }
2729
2730            human_interact_context = Some(Value::Object(context));
2731        }
2732
2733        let restart_requested = validated_events.iter().any(Self::is_restart_request_event)
2734            || response_event
2735                .as_ref()
2736                .is_some_and(Self::is_restart_request_event);
2737        if restart_requested {
2738            self.mark_restart_requested("human_text");
2739        }
2740
2741        // Track whether any events will be published (before the loop consumes them).
2742        let had_events = !validated_events.is_empty();
2743        let had_plan_events = validated_events
2744            .iter()
2745            .any(|event| event.topic.as_str().starts_with("plan."));
2746
2747        // Publish validated events to the bus.
2748        // Ralph is always registered with subscribe("*"), so every event has at least
2749        // one subscriber. Events without a specific hat subscriber are "orphaned" —
2750        // Ralph handles them as the universal fallback.
2751        for event in validated_events {
2752            // Record topic for event chain validation
2753            self.state.record_event(&event);
2754
2755            self.diagnostics.log_orchestration(
2756                self.state.iteration,
2757                "jsonl",
2758                crate::diagnostics::OrchestrationEvent::EventPublished {
2759                    topic: event.topic.to_string(),
2760                },
2761            );
2762
2763            if !self.registry.has_subscriber(event.topic.as_str()) {
2764                has_orphans = true;
2765            }
2766
2767            debug!(
2768                topic = %event.topic,
2769                "Publishing event from JSONL"
2770            );
2771            self.bus.publish(event);
2772        }
2773
2774        // Publish human.response event if one was received during blocking
2775        if let Some(response) = response_event {
2776            self.state.record_event(&response);
2777            info!(
2778                topic = %response.topic,
2779                "Publishing human.response event from robot service"
2780            );
2781            self.bus.publish(response);
2782        }
2783
2784        Ok(ProcessedEvents {
2785            had_events,
2786            had_plan_events,
2787            human_interact_context,
2788            has_orphans,
2789        })
2790    }
2791
2792    /// Process events from JSONL, partitioning wave events from regular events.
2793    ///
2794    /// Wave events (those with `wave_id` set and targeting a concurrent hat) are
2795    /// extracted and returned separately. Regular events go through the full
2796    /// backpressure pipeline via `process_parse_result`.
2797    pub fn process_events_from_jsonl_with_waves(
2798        &mut self,
2799    ) -> std::io::Result<ProcessedEventsWithWaves> {
2800        let result = self.event_reader.read_new_events()?;
2801
2802        // Partition: wave dispatch events vs regular events.
2803        // Only events that target a concurrent hat (concurrency > 1) are wave dispatches.
2804        // Wave *results* (e.g. review.done) have wave_id set but should be treated as
2805        // regular events so they reach the bus and trigger downstream hats (e.g. aggregator).
2806        //
2807        // Uses find_by_trigger + get_config — the same resolution path as
2808        // detect_wave_events — to ensure partition and detection agree.
2809        let (wave_events, regular_events): (Vec<_>, Vec<_>) =
2810            result.events.into_iter().partition(|e| {
2811                e.wave_id.is_some()
2812                    && self
2813                        .registry
2814                        .find_by_trigger(e.topic.as_str())
2815                        .and_then(|hat_id| self.registry.get_config(hat_id))
2816                        .is_some_and(|hat_config| hat_config.concurrency > 1)
2817            });
2818
2819        if !wave_events.is_empty() {
2820            debug!(
2821                wave_count = wave_events.len(),
2822                regular_count = regular_events.len(),
2823                "Partitioned wave events from regular events"
2824            );
2825        }
2826
2827        // Delegate regular events to the full pipeline (backpressure, scope
2828        // enforcement, human.interact, plan detection, etc.)
2829        let regular_result = crate::event_reader::ParseResult {
2830            events: regular_events,
2831            malformed: result.malformed,
2832        };
2833        let processed = self.process_parse_result(regular_result)?;
2834
2835        Ok(ProcessedEventsWithWaves {
2836            processed,
2837            wave_events,
2838        })
2839    }
2840
2841    /// Checks if output contains a completion event from Ralph.
2842    ///
2843    /// Completion must be emitted as an `<event>` tag, not plain text.
2844    pub fn check_ralph_completion(&self, output: &str) -> bool {
2845        let events = EventParser::new().parse(output);
2846        events
2847            .iter()
2848            .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2849    }
2850
2851    /// Publishes the loop.terminate system event to observers.
2852    ///
2853    /// Per spec: "Published by the orchestrator (not agents) when the loop exits."
2854    /// This is an observer-only event—hats cannot trigger on it.
2855    ///
2856    /// Returns the event for logging purposes.
2857    pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2858        // Stop the robot service if it was running
2859        self.stop_robot_service();
2860
2861        let elapsed = self.state.elapsed();
2862        let duration_str = format_duration(elapsed);
2863
2864        let payload = format!(
2865            "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2866            reason.as_str(),
2867            termination_status_text(reason),
2868            self.state.iteration,
2869            duration_str,
2870            reason.exit_code()
2871        );
2872
2873        let event = Event::new("loop.terminate", &payload);
2874
2875        // Publish to bus for observers (but no hat can trigger on this)
2876        self.bus.publish(event.clone());
2877
2878        info!(
2879            reason = %reason.as_str(),
2880            iterations = self.state.iteration,
2881            duration = %duration_str,
2882            "Wrapping up: {}. {} iterations in {}.",
2883            reason.as_str(),
2884            self.state.iteration,
2885            duration_str
2886        );
2887
2888        event
2889    }
2890
2891    /// Returns the robot service's shutdown flag, if active.
2892    ///
2893    /// Signal handlers can set this flag to interrupt `wait_for_response()`
2894    /// without waiting for the full timeout.
2895    pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2896        self.robot_service.as_ref().map(|s| s.shutdown_flag())
2897    }
2898
2899    /// Stops the robot service if it's running.
2900    ///
2901    /// Called during loop termination to cleanly shut down the communication backend.
2902    fn stop_robot_service(&mut self) {
2903        if let Some(service) = self.robot_service.take() {
2904            service.stop();
2905        }
2906    }
2907
2908    // -------------------------------------------------------------------------
2909    // Human-in-the-loop planning support
2910    // -------------------------------------------------------------------------
2911
2912    /// Check if any event is a `user.prompt` event.
2913    ///
2914    /// Returns the first user prompt event found, or None.
2915    pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2916        events
2917            .iter()
2918            .find(|e| e.topic.as_str() == "user.prompt")
2919            .map(|e| UserPrompt {
2920                id: Self::extract_prompt_id(&e.payload),
2921                text: e.payload.clone(),
2922            })
2923    }
2924
2925    /// Extract a prompt ID from the event payload.
2926    ///
2927    /// Supports both XML attribute format: `<event topic="user.prompt" id="q1">...</event>`
2928    /// and JSON format in payload.
2929    fn extract_prompt_id(payload: &str) -> String {
2930        // Try to extract id attribute from XML-like format first
2931        if let Some(start) = payload.find("id=\"")
2932            && let Some(end) = payload[start + 4..].find('"')
2933        {
2934            return payload[start + 4..start + 4 + end].to_string();
2935        }
2936
2937        // Fallback: generate a simple ID based on timestamp
2938        format!("q{}", Self::generate_prompt_id())
2939    }
2940
2941    /// Generate a simple unique ID for prompts.
2942    /// Uses timestamp-based generation since uuid crate isn't available.
2943    fn generate_prompt_id() -> String {
2944        use std::time::{SystemTime, UNIX_EPOCH};
2945        let nanos = SystemTime::now()
2946            .duration_since(UNIX_EPOCH)
2947            .unwrap()
2948            .as_nanos();
2949        format!("{:x}", nanos % 0xFFFF_FFFF)
2950    }
2951}
2952
2953/// A user prompt that requires human input.
2954///
2955/// Created when the agent emits a `user.prompt` event during planning.
2956#[derive(Debug, Clone)]
2957pub struct UserPrompt {
2958    /// Unique identifier for this prompt (e.g., "q1", "q2")
2959    pub id: String,
2960    /// The prompt/question text
2961    pub text: String,
2962}
2963
2964/// Formats a duration as human-readable string.
2965fn format_duration(d: Duration) -> String {
2966    let total_secs = d.as_secs();
2967    let hours = total_secs / 3600;
2968    let minutes = (total_secs % 3600) / 60;
2969    let seconds = total_secs % 60;
2970
2971    if hours > 0 {
2972        format!("{}h {}m {}s", hours, minutes, seconds)
2973    } else if minutes > 0 {
2974        format!("{}m {}s", minutes, seconds)
2975    } else {
2976        format!("{}s", seconds)
2977    }
2978}
2979
2980/// Returns a human-readable status based on termination reason.
2981fn termination_status_text(reason: &TerminationReason) -> &'static str {
2982    match reason {
2983        TerminationReason::CompletionPromise => "All tasks completed successfully.",
2984        TerminationReason::MaxIterations => "Stopped at iteration limit.",
2985        TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2986        TerminationReason::MaxCost => "Stopped at cost limit.",
2987        TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2988        TerminationReason::LoopThrashing => {
2989            "Loop thrashing detected - same hat repeatedly blocked."
2990        }
2991        TerminationReason::LoopStale => {
2992            "Stale loop detected - same topic emitted 3+ times consecutively."
2993        }
2994        TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2995        TerminationReason::Stopped => "Manually stopped.",
2996        TerminationReason::Interrupted => "Interrupted by signal.",
2997        TerminationReason::RestartRequested => "Restarting by human request.",
2998        TerminationReason::WorkspaceGone => "Workspace directory removed externally.",
2999        TerminationReason::Cancelled => "Cancelled gracefully (human rejection or timeout).",
3000    }
3001}