Skip to main content

ralph_core/event_loop/
mod.rs

1//! Event loop orchestration.
2//!
3//! The event loop coordinates the execution of hats via pub/sub messaging.
4
5mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::EventParser;
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use ralph_proto::{Event, EventBus, Hat, HatId};
21use ralph_telegram::TelegramService;
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::sync::atomic::AtomicBool;
25use std::time::Duration;
26use tracing::{debug, info, warn};
27
28/// Reason the event loop terminated.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum TerminationReason {
31    /// Completion promise was detected in output.
32    CompletionPromise,
33    /// Maximum iterations reached.
34    MaxIterations,
35    /// Maximum runtime exceeded.
36    MaxRuntime,
37    /// Maximum cost exceeded.
38    MaxCost,
39    /// Too many consecutive failures.
40    ConsecutiveFailures,
41    /// Loop thrashing detected (repeated blocked events).
42    LoopThrashing,
43    /// Too many consecutive malformed JSONL lines in events file.
44    ValidationFailure,
45    /// Manually stopped.
46    Stopped,
47    /// Interrupted by signal (SIGINT/SIGTERM).
48    Interrupted,
49    /// Chaos mode completion promise detected.
50    ChaosModeComplete,
51    /// Chaos mode max iterations reached.
52    ChaosModeMaxIterations,
53    /// Restart requested via Telegram `/restart` command.
54    RestartRequested,
55}
56
57impl TerminationReason {
58    /// Returns the exit code for this termination reason per spec.
59    ///
60    /// Per spec "Loop Termination" section:
61    /// - 0: Completion promise detected (success)
62    /// - 1: Consecutive failures or unrecoverable error (failure)
63    /// - 2: Max iterations, max runtime, or max cost exceeded (limit)
64    /// - 130: User interrupt (SIGINT = 128 + 2)
65    pub fn exit_code(&self) -> i32 {
66        match self {
67            TerminationReason::CompletionPromise | TerminationReason::ChaosModeComplete => 0,
68            TerminationReason::ConsecutiveFailures
69            | TerminationReason::LoopThrashing
70            | TerminationReason::ValidationFailure
71            | TerminationReason::Stopped => 1,
72            TerminationReason::MaxIterations
73            | TerminationReason::MaxRuntime
74            | TerminationReason::MaxCost
75            | TerminationReason::ChaosModeMaxIterations => 2,
76            TerminationReason::Interrupted => 130,
77            // Restart uses exit code 3 to signal the caller to exec-replace
78            TerminationReason::RestartRequested => 3,
79        }
80    }
81
82    /// Returns the reason string for use in loop.terminate event payload.
83    ///
84    /// Per spec event payload format:
85    /// `completed | max_iterations | max_runtime | consecutive_failures | interrupted | error`
86    pub fn as_str(&self) -> &'static str {
87        match self {
88            TerminationReason::CompletionPromise => "completed",
89            TerminationReason::MaxIterations => "max_iterations",
90            TerminationReason::MaxRuntime => "max_runtime",
91            TerminationReason::MaxCost => "max_cost",
92            TerminationReason::ConsecutiveFailures => "consecutive_failures",
93            TerminationReason::LoopThrashing => "loop_thrashing",
94            TerminationReason::ValidationFailure => "validation_failure",
95            TerminationReason::Stopped => "stopped",
96            TerminationReason::Interrupted => "interrupted",
97            TerminationReason::ChaosModeComplete => "chaos_complete",
98            TerminationReason::ChaosModeMaxIterations => "chaos_max_iterations",
99            TerminationReason::RestartRequested => "restart_requested",
100        }
101    }
102
103    /// Returns true if this is a successful completion (not an error or limit).
104    pub fn is_success(&self) -> bool {
105        matches!(
106            self,
107            TerminationReason::CompletionPromise | TerminationReason::ChaosModeComplete
108        )
109    }
110
111    /// Returns true if this termination triggers chaos mode.
112    ///
113    /// Chaos mode ONLY activates after LOOP_COMPLETE - not on other termination reasons.
114    pub fn triggers_chaos_mode(&self) -> bool {
115        matches!(self, TerminationReason::CompletionPromise)
116    }
117}
118
119/// The main event loop orchestrator.
120pub struct EventLoop {
121    config: RalphConfig,
122    registry: HatRegistry,
123    bus: EventBus,
124    state: LoopState,
125    instruction_builder: InstructionBuilder,
126    ralph: HatlessRalph,
127    /// Cached human guidance messages that should persist across iterations.
128    robot_guidance: Vec<String>,
129    /// Event reader for consuming events from JSONL file.
130    /// Made pub(crate) to allow tests to override the path.
131    pub(crate) event_reader: EventReader,
132    diagnostics: crate::diagnostics::DiagnosticsCollector,
133    /// Loop context for path resolution (None for legacy single-loop mode).
134    loop_context: Option<LoopContext>,
135    /// Skill registry for the current loop.
136    skill_registry: SkillRegistry,
137    /// Telegram service for human-in-the-loop communication.
138    /// Only initialized when `human.enabled` is true and this is the primary loop.
139    telegram_service: Option<TelegramService>,
140}
141
142impl EventLoop {
143    /// Creates a new event loop from configuration.
144    pub fn new(config: RalphConfig) -> Self {
145        // Try to create diagnostics collector, but fall back to disabled if it fails
146        // (e.g., in tests without proper directory setup)
147        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
148            .unwrap_or_else(|e| {
149                debug!(
150                    "Failed to initialize diagnostics: {}, using disabled collector",
151                    e
152                );
153                crate::diagnostics::DiagnosticsCollector::disabled()
154            });
155
156        Self::with_diagnostics(config, diagnostics)
157    }
158
159    /// Creates a new event loop with a loop context for path resolution.
160    ///
161    /// The loop context determines where events, tasks, and other state files
162    /// are located. Use this for multi-loop scenarios where each loop runs
163    /// in an isolated workspace (git worktree).
164    pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
165        let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
166            .unwrap_or_else(|e| {
167                debug!(
168                    "Failed to initialize diagnostics: {}, using disabled collector",
169                    e
170                );
171                crate::diagnostics::DiagnosticsCollector::disabled()
172            });
173
174        Self::with_context_and_diagnostics(config, context, diagnostics)
175    }
176
177    /// Creates a new event loop with explicit loop context and diagnostics.
178    pub fn with_context_and_diagnostics(
179        config: RalphConfig,
180        context: LoopContext,
181        diagnostics: crate::diagnostics::DiagnosticsCollector,
182    ) -> Self {
183        let registry = HatRegistry::from_config(&config);
184        let instruction_builder = InstructionBuilder::with_events(
185            &config.event_loop.completion_promise,
186            config.core.clone(),
187            config.events.clone(),
188        );
189
190        let mut bus = EventBus::new();
191
192        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
193        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
194        // Custom hats are registered first (higher priority), Ralph catches everything else.
195        for hat in registry.all() {
196            bus.register(hat.clone());
197        }
198
199        // Always register Ralph as catch-all coordinator
200        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
201        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
202        bus.register(ralph_hat);
203
204        if registry.is_empty() {
205            debug!("Solo mode: Ralph is the only coordinator");
206        } else {
207            debug!(
208                "Multi-hat mode: {} custom hats + Ralph as fallback",
209                registry.len()
210            );
211        }
212
213        // Build skill registry from config
214        let skill_registry = if config.skills.enabled {
215            SkillRegistry::from_config(
216                &config.skills,
217                context.workspace(),
218                Some(config.cli.backend.as_str()),
219            )
220            .unwrap_or_else(|e| {
221                warn!(
222                    "Failed to build skill registry: {}, using empty registry",
223                    e
224                );
225                SkillRegistry::new(Some(config.cli.backend.as_str()))
226            })
227        } else {
228            SkillRegistry::new(Some(config.cli.backend.as_str()))
229        };
230
231        let skill_index = if config.skills.enabled {
232            skill_registry.build_index(None)
233        } else {
234            String::new()
235        };
236
237        // When memories are enabled, add tasks CLI instructions alongside scratchpad
238        let ralph = HatlessRalph::new(
239            config.event_loop.completion_promise.clone(),
240            config.core.clone(),
241            &registry,
242            config.event_loop.starting_event.clone(),
243        )
244        .with_memories_enabled(config.memories.enabled)
245        .with_skill_index(skill_index);
246
247        // Read timestamped events path from marker file, fall back to default
248        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
249        // which we resolve relative to the workspace root
250        let events_path = std::fs::read_to_string(context.current_events_marker())
251            .map(|s| {
252                let relative = s.trim();
253                context.workspace().join(relative)
254            })
255            .unwrap_or_else(|_| context.events_path());
256        let event_reader = EventReader::new(&events_path);
257
258        // Initialize Telegram service if human-in-the-loop is enabled
259        // and this is the primary loop (has a loop context with is_primary).
260        let telegram_service = Self::create_telegram_service(&config, Some(&context));
261
262        Self {
263            config,
264            registry,
265            bus,
266            state: LoopState::new(),
267            instruction_builder,
268            ralph,
269            robot_guidance: Vec::new(),
270            event_reader,
271            diagnostics,
272            loop_context: Some(context),
273            skill_registry,
274            telegram_service,
275        }
276    }
277
278    /// Creates a new event loop with explicit diagnostics collector (for testing).
279    pub fn with_diagnostics(
280        config: RalphConfig,
281        diagnostics: crate::diagnostics::DiagnosticsCollector,
282    ) -> Self {
283        let registry = HatRegistry::from_config(&config);
284        let instruction_builder = InstructionBuilder::with_events(
285            &config.event_loop.completion_promise,
286            config.core.clone(),
287            config.events.clone(),
288        );
289
290        let mut bus = EventBus::new();
291
292        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
293        // Ralph is ALWAYS registered as the universal fallback for orphaned events.
294        // Custom hats are registered first (higher priority), Ralph catches everything else.
295        for hat in registry.all() {
296            bus.register(hat.clone());
297        }
298
299        // Always register Ralph as catch-all coordinator
300        // Per spec: "Ralph runs when no hat triggered — Universal fallback for orphaned events"
301        let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); // Subscribe to all events
302        bus.register(ralph_hat);
303
304        if registry.is_empty() {
305            debug!("Solo mode: Ralph is the only coordinator");
306        } else {
307            debug!(
308                "Multi-hat mode: {} custom hats + Ralph as fallback",
309                registry.len()
310            );
311        }
312
313        // Build skill registry from config
314        let workspace_root = std::path::Path::new(".");
315        let skill_registry = if config.skills.enabled {
316            SkillRegistry::from_config(
317                &config.skills,
318                workspace_root,
319                Some(config.cli.backend.as_str()),
320            )
321            .unwrap_or_else(|e| {
322                warn!(
323                    "Failed to build skill registry: {}, using empty registry",
324                    e
325                );
326                SkillRegistry::new(Some(config.cli.backend.as_str()))
327            })
328        } else {
329            SkillRegistry::new(Some(config.cli.backend.as_str()))
330        };
331
332        let skill_index = if config.skills.enabled {
333            skill_registry.build_index(None)
334        } else {
335            String::new()
336        };
337
338        // When memories are enabled, add tasks CLI instructions alongside scratchpad
339        let ralph = HatlessRalph::new(
340            config.event_loop.completion_promise.clone(),
341            config.core.clone(),
342            &registry,
343            config.event_loop.starting_event.clone(),
344        )
345        .with_memories_enabled(config.memories.enabled)
346        .with_skill_index(skill_index);
347
348        // Read events path from marker file, fall back to default if not present
349        // The marker file is written by run_loop_impl() at run startup
350        let events_path = std::fs::read_to_string(".ralph/current-events")
351            .map(|s| s.trim().to_string())
352            .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
353        let event_reader = EventReader::new(&events_path);
354
355        // Initialize Telegram service if human-in-the-loop is enabled.
356        // Legacy single-loop mode (no context) is treated as primary.
357        let telegram_service = Self::create_telegram_service(&config, None);
358
359        Self {
360            config,
361            registry,
362            bus,
363            state: LoopState::new(),
364            instruction_builder,
365            ralph,
366            robot_guidance: Vec::new(),
367            event_reader,
368            diagnostics,
369            loop_context: None,
370            skill_registry,
371            telegram_service,
372        }
373    }
374
375    /// Attempts to create and start a `TelegramService` if human-in-the-loop is enabled.
376    ///
377    /// The service is only created when:
378    /// - `human.enabled` is `true` in config
379    /// - This is the primary loop (holds `.ralph/loop.lock`), or no context is provided
380    ///   (legacy single-loop mode, treated as primary)
381    ///
382    /// Returns `None` if the service should not be started or if startup fails.
383    fn create_telegram_service(
384        config: &RalphConfig,
385        context: Option<&LoopContext>,
386    ) -> Option<TelegramService> {
387        if !config.robot.enabled {
388            return None;
389        }
390
391        // Only the primary loop starts the Telegram service.
392        // If we have a context, check is_primary. No context = legacy single-loop = primary.
393        if let Some(ctx) = context
394            && !ctx.is_primary()
395        {
396            debug!(
397                workspace = %ctx.workspace().display(),
398                "Skipping Telegram service: not the primary loop"
399            );
400            return None;
401        }
402
403        let workspace_root = context
404            .map(|ctx| ctx.workspace().to_path_buf())
405            .unwrap_or_else(|| config.core.workspace_root.clone());
406
407        let bot_token = config.robot.resolve_bot_token();
408        let timeout_secs = config.robot.timeout_seconds.unwrap_or(300);
409        let loop_id = context
410            .and_then(|ctx| ctx.loop_id().map(String::from))
411            .unwrap_or_else(|| "main".to_string());
412
413        match TelegramService::new(workspace_root, bot_token, timeout_secs, loop_id) {
414            Ok(service) => {
415                if let Err(e) = service.start() {
416                    warn!(error = %e, "Failed to start Telegram service");
417                    return None;
418                }
419                info!(
420                    bot_token = %service.bot_token_masked(),
421                    timeout_secs = service.timeout_secs(),
422                    "Telegram human-in-the-loop service active"
423                );
424                Some(service)
425            }
426            Err(e) => {
427                warn!(error = %e, "Failed to create Telegram service");
428                None
429            }
430        }
431    }
432
433    /// Returns the loop context, if one was provided.
434    pub fn loop_context(&self) -> Option<&LoopContext> {
435        self.loop_context.as_ref()
436    }
437
438    /// Returns the tasks path based on loop context or default.
439    fn tasks_path(&self) -> PathBuf {
440        self.loop_context
441            .as_ref()
442            .map(|ctx| ctx.tasks_path())
443            .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
444    }
445
446    /// Returns the scratchpad path based on loop context or config.
447    fn scratchpad_path(&self) -> PathBuf {
448        self.loop_context
449            .as_ref()
450            .map(|ctx| ctx.scratchpad_path())
451            .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
452    }
453
454    /// Returns the current loop state.
455    pub fn state(&self) -> &LoopState {
456        &self.state
457    }
458
459    /// Returns the configuration.
460    pub fn config(&self) -> &RalphConfig {
461        &self.config
462    }
463
464    /// Returns the hat registry.
465    pub fn registry(&self) -> &HatRegistry {
466        &self.registry
467    }
468
469    /// Gets the backend configuration for a hat.
470    ///
471    /// If the hat has a backend configured, returns that.
472    /// Otherwise, returns None (caller should use global backend).
473    pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
474        self.registry
475            .get_config(hat_id)
476            .and_then(|config| config.backend.as_ref())
477    }
478
479    /// Adds an observer that receives all published events.
480    ///
481    /// Multiple observers can be added (e.g., session recorder + TUI).
482    /// Each observer is called before events are routed to subscribers.
483    pub fn add_observer<F>(&mut self, observer: F)
484    where
485        F: Fn(&Event) + Send + 'static,
486    {
487        self.bus.add_observer(observer);
488    }
489
490    /// Sets a single observer, clearing any existing observers.
491    ///
492    /// Prefer `add_observer` when multiple observers are needed.
493    #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
494    pub fn set_observer<F>(&mut self, observer: F)
495    where
496        F: Fn(&Event) + Send + 'static,
497    {
498        #[allow(deprecated)]
499        self.bus.set_observer(observer);
500    }
501
502    /// Checks if any termination condition is met.
503    pub fn check_termination(&self) -> Option<TerminationReason> {
504        let cfg = &self.config.event_loop;
505
506        if self.state.iteration >= cfg.max_iterations {
507            return Some(TerminationReason::MaxIterations);
508        }
509
510        if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
511            return Some(TerminationReason::MaxRuntime);
512        }
513
514        if let Some(max_cost) = cfg.max_cost_usd
515            && self.state.cumulative_cost >= max_cost
516        {
517            return Some(TerminationReason::MaxCost);
518        }
519
520        if self.state.consecutive_failures >= cfg.max_consecutive_failures {
521            return Some(TerminationReason::ConsecutiveFailures);
522        }
523
524        // Check for loop thrashing: planner keeps dispatching abandoned tasks
525        if self.state.abandoned_task_redispatches >= 3 {
526            return Some(TerminationReason::LoopThrashing);
527        }
528
529        // Check for validation failures: too many consecutive malformed JSONL lines
530        if self.state.consecutive_malformed_events >= 3 {
531            return Some(TerminationReason::ValidationFailure);
532        }
533
534        // Check for restart signal from Telegram /restart command
535        let restart_path =
536            std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
537        if restart_path.exists() {
538            return Some(TerminationReason::RestartRequested);
539        }
540
541        None
542    }
543
544    /// Initializes the loop by publishing the start event.
545    pub fn initialize(&mut self, prompt_content: &str) {
546        // Use configured starting_event or default to task.start for backward compatibility
547        let topic = self
548            .config
549            .event_loop
550            .starting_event
551            .clone()
552            .unwrap_or_else(|| "task.start".to_string());
553        self.initialize_with_topic(&topic, prompt_content);
554    }
555
556    /// Initializes the loop for resume mode by publishing task.resume.
557    ///
558    /// Per spec: "User can run `ralph resume` to restart reading existing scratchpad."
559    /// The planner should read the existing scratchpad rather than doing fresh gap analysis.
560    pub fn initialize_resume(&mut self, prompt_content: &str) {
561        // Resume always uses task.resume regardless of starting_event config
562        self.initialize_with_topic("task.resume", prompt_content);
563    }
564
565    /// Common initialization logic with configurable topic.
566    fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
567        // Store the objective so it persists across all iterations.
568        // After iteration 1, bus.take_pending() consumes the start event,
569        // so without this the objective would be invisible to later hats.
570        self.ralph.set_objective(prompt_content.to_string());
571
572        let start_event = Event::new(topic, prompt_content);
573        self.bus.publish(start_event);
574        debug!(topic = topic, "Published {} event", topic);
575    }
576
577    /// Gets the next hat to execute (if any have pending events).
578    ///
579    /// Per "Hatless Ralph" architecture: When custom hats are defined, Ralph is
580    /// always the executor. Custom hats define topology (pub/sub contracts) that
581    /// Ralph uses for coordination context, but Ralph handles all iterations.
582    ///
583    /// - Solo mode (no custom hats): Returns "ralph" if Ralph has pending events
584    /// - Multi-hat mode (custom hats defined): Always returns "ralph" if ANY hat has pending events
585    pub fn next_hat(&self) -> Option<&HatId> {
586        let next = self.bus.next_hat_with_pending();
587
588        // If no pending events, return None
589        next.as_ref()?;
590
591        // In multi-hat mode, always route to Ralph (custom hats define topology only)
592        // Ralph's prompt includes the ## HATS section for coordination awareness
593        if self.registry.is_empty() {
594            // Solo mode - return the next hat (which is "ralph")
595            next
596        } else {
597            // Return "ralph" - the constant coordinator
598            // Find ralph in the bus's registered hats
599            self.bus.hat_ids().find(|id| id.as_str() == "ralph")
600        }
601    }
602
603    /// Checks if any hats have pending events.
604    ///
605    /// Use this after `process_output` to detect if the LLM failed to publish an event.
606    /// If false after processing, the loop will terminate on the next iteration.
607    pub fn has_pending_events(&self) -> bool {
608        self.bus.next_hat_with_pending().is_some()
609    }
610
611    /// Checks if any pending events are human-related (human.response, human.guidance).
612    ///
613    /// Used to skip cooldown delays when a human event is next, since we don't
614    /// want to artificially delay the response to a human interaction.
615    pub fn has_pending_human_events(&self) -> bool {
616        self.bus.hat_ids().any(|hat_id| {
617            self.bus
618                .peek_pending(hat_id)
619                .map(|events| {
620                    events.iter().any(|e| {
621                        let topic = e.topic.as_str();
622                        topic == "human.response" || topic == "human.guidance"
623                    })
624                })
625                .unwrap_or(false)
626        })
627    }
628
629    /// Gets the topics a hat is allowed to publish.
630    ///
631    /// Used to build retry prompts when the LLM forgets to publish an event.
632    pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
633        self.registry
634            .get(hat_id)
635            .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
636            .unwrap_or_default()
637    }
638
639    /// Injects a fallback event to recover from a stalled loop.
640    ///
641    /// When no hats have pending events (agent failed to publish), this method
642    /// injects a `task.resume` event which Ralph will handle to attempt recovery.
643    ///
644    /// Returns true if a fallback event was injected, false if recovery is not possible.
645    pub fn inject_fallback_event(&mut self) -> bool {
646        let fallback_event = Event::new(
647            "task.resume",
648            "RECOVERY: Previous iteration did not publish an event. \
649             Review the scratchpad and either dispatch the next task or complete the loop.",
650        );
651
652        // If a custom hat was last executing, target the fallback back to it
653        // This preserves hat context instead of always falling back to Ralph
654        let fallback_event = match &self.state.last_hat {
655            Some(hat_id) if hat_id.as_str() != "ralph" => {
656                debug!(
657                    hat = %hat_id.as_str(),
658                    "Injecting fallback event to recover - targeting last hat with task.resume"
659                );
660                fallback_event.with_target(hat_id.clone())
661            }
662            _ => {
663                debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
664                fallback_event
665            }
666        };
667
668        self.bus.publish(fallback_event);
669        true
670    }
671
672    /// Builds the prompt for a hat's execution.
673    ///
674    /// Per "Hatless Ralph" architecture:
675    /// - Solo mode: Ralph handles everything with his own prompt
676    /// - Multi-hat mode: Ralph is the sole executor, custom hats define topology only
677    ///
678    /// When in multi-hat mode, this method collects ALL pending events across all hats
679    /// and builds Ralph's prompt with that context. The `## HATS` section in Ralph's
680    /// prompt documents the topology for coordination awareness.
681    ///
682    /// If memories are configured with `inject: auto`, this method also prepends
683    /// primed memories to the prompt context. If a scratchpad file exists and is
684    /// non-empty, its content is also prepended (before memories).
685    pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
686        // Handle "ralph" hat - the constant coordinator
687        // Per spec: "Hatless Ralph is constant — Cannot be replaced, overwritten, or configured away"
688        if hat_id.as_str() == "ralph" {
689            if self.registry.is_empty() {
690                // Solo mode - just Ralph's events, no hats to filter
691                let events = self.bus.take_pending(&hat_id.clone());
692
693                // Separate human.guidance events from regular events
694                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
695                    .into_iter()
696                    .partition(|e| e.topic.as_str() == "human.guidance");
697
698                let events_context = regular_events
699                    .iter()
700                    .map(|e| Self::format_event(e))
701                    .collect::<Vec<_>>()
702                    .join("\n");
703
704                // Persist and inject human guidance into prompt if present
705                self.update_robot_guidance(guidance_events);
706                self.apply_robot_guidance();
707
708                // Build base prompt and prepend memories + scratchpad + ready tasks
709                let base_prompt = self.ralph.build_prompt(&events_context, &[]);
710                self.ralph.clear_robot_guidance();
711                let with_skills = self.prepend_auto_inject_skills(base_prompt);
712                let with_scratchpad = self.prepend_scratchpad(with_skills);
713                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
714
715                debug!("build_prompt: routing to HatlessRalph (solo mode)");
716                return Some(final_prompt);
717            } else {
718                // Multi-hat mode: collect events and determine active hats
719                let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
720                // Deterministic ordering (avoid HashMap iteration order nondeterminism).
721                all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
722
723                let mut all_events = Vec::new();
724                let mut system_events = Vec::new();
725
726                for id in &all_hat_ids {
727                    let pending = self.bus.take_pending(id);
728                    if pending.is_empty() {
729                        continue;
730                    }
731
732                    let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
733                    if drop_pending {
734                        // Drop the pending events that would have activated the hat.
735                        if let Some(exhausted_event) = exhausted_event {
736                            all_events.push(exhausted_event.clone());
737                            system_events.push(exhausted_event);
738                        }
739                        continue;
740                    }
741
742                    all_events.extend(pending);
743                }
744
745                // Publish orchestrator-generated system events after consuming pending events,
746                // so they become visible in the event log and can be handled next iteration.
747                for event in system_events {
748                    self.bus.publish(event);
749                }
750
751                // Separate human.guidance events from regular events
752                let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
753                    .into_iter()
754                    .partition(|e| e.topic.as_str() == "human.guidance");
755
756                // Persist and inject human guidance before building prompt (must happen before
757                // immutable borrows from determine_active_hats)
758                self.update_robot_guidance(guidance_events);
759                self.apply_robot_guidance();
760
761                // Determine which hats are active based on regular events
762                let active_hat_ids = self.determine_active_hat_ids(&regular_events);
763                self.record_hat_activations(&active_hat_ids);
764                let active_hats = self.determine_active_hats(&regular_events);
765
766                // Format events for context
767                let events_context = regular_events
768                    .iter()
769                    .map(|e| Self::format_event(e))
770                    .collect::<Vec<_>>()
771                    .join("\n");
772
773                // Build base prompt and prepend memories + scratchpad if available
774                let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
775
776                // Build prompt with active hats - filters instructions to only active hats
777                debug!(
778                    "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
779                    active_hats
780                        .iter()
781                        .map(|h| h.id.as_str())
782                        .collect::<Vec<_>>()
783                );
784
785                // Clear guidance after active_hats references are no longer needed
786                self.ralph.clear_robot_guidance();
787                let with_skills = self.prepend_auto_inject_skills(base_prompt);
788                let with_scratchpad = self.prepend_scratchpad(with_skills);
789                let final_prompt = self.prepend_ready_tasks(with_scratchpad);
790
791                return Some(final_prompt);
792            }
793        }
794
795        // Non-ralph hat requested - this shouldn't happen in multi-hat mode since
796        // next_hat() always returns "ralph" when custom hats are defined.
797        // But we keep this code path for backward compatibility and tests.
798        let events = self.bus.take_pending(&hat_id.clone());
799        let events_context = events
800            .iter()
801            .map(|e| Self::format_event(e))
802            .collect::<Vec<_>>()
803            .join("\n");
804
805        let hat = self.registry.get(hat_id)?;
806
807        // Debug logging to trace hat routing
808        debug!(
809            "build_prompt: hat_id='{}', instructions.is_empty()={}",
810            hat_id.as_str(),
811            hat.instructions.is_empty()
812        );
813
814        // All hats use build_custom_hat with ghuntley-style prompts
815        debug!(
816            "build_prompt: routing to build_custom_hat() for '{}'",
817            hat_id.as_str()
818        );
819        Some(
820            self.instruction_builder
821                .build_custom_hat(hat, &events_context),
822        )
823    }
824
825    /// Stores guidance payloads, persists them to scratchpad, and prepares them for prompt injection.
826    ///
827    /// Guidance events are ephemeral in the event bus (consumed by `take_pending`).
828    /// This method both caches them in memory for prompt injection and appends
829    /// them to the scratchpad file so they survive across process restarts.
830    fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
831        if guidance_events.is_empty() {
832            return;
833        }
834
835        // Persist new guidance to scratchpad before caching
836        self.persist_guidance_to_scratchpad(&guidance_events);
837
838        self.robot_guidance
839            .extend(guidance_events.into_iter().map(|e| e.payload));
840    }
841
842    /// Appends human guidance entries to the scratchpad file for durability.
843    ///
844    /// Each guidance message is written as a timestamped markdown entry so it
845    /// appears alongside the agent's own thinking and survives process restarts.
846    fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
847        use std::io::Write;
848
849        let scratchpad_path = self.scratchpad_path();
850        let resolved_path = if scratchpad_path.is_relative() {
851            self.config.core.workspace_root.join(&scratchpad_path)
852        } else {
853            scratchpad_path
854        };
855
856        // Create parent directories if needed
857        if let Some(parent) = resolved_path.parent()
858            && !parent.exists()
859            && let Err(e) = std::fs::create_dir_all(parent)
860        {
861            warn!("Failed to create scratchpad directory: {}", e);
862            return;
863        }
864
865        let mut file = match std::fs::OpenOptions::new()
866            .create(true)
867            .append(true)
868            .open(&resolved_path)
869        {
870            Ok(f) => f,
871            Err(e) => {
872                warn!("Failed to open scratchpad for guidance persistence: {}", e);
873                return;
874            }
875        };
876
877        let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
878        for event in guidance_events {
879            let entry = format!(
880                "\n### HUMAN GUIDANCE ({})\n\n{}\n",
881                timestamp, event.payload
882            );
883            if let Err(e) = file.write_all(entry.as_bytes()) {
884                warn!("Failed to write guidance to scratchpad: {}", e);
885            }
886        }
887
888        info!(
889            count = guidance_events.len(),
890            "Persisted human guidance to scratchpad"
891        );
892    }
893
894    /// Injects cached guidance into the next prompt build.
895    fn apply_robot_guidance(&mut self) {
896        if self.robot_guidance.is_empty() {
897            return;
898        }
899
900        self.ralph.set_robot_guidance(self.robot_guidance.clone());
901    }
902
903    /// Prepends auto-injected skill content to the prompt.
904    ///
905    /// This generalizes the former `prepend_memories()` into a skill auto-injection
906    /// pipeline that handles memories, tools, and any other auto-inject skills.
907    ///
908    /// Injection order:
909    /// 1. Memory data + ralph-tools skill (special case: loads memory data from store, applies budget)
910    /// 2. RObot interaction skill (gated by `robot.enabled`)
911    /// 3. Other auto-inject skills from the registry (wrapped in XML tags)
912    fn prepend_auto_inject_skills(&self, prompt: String) -> String {
913        let mut prefix = String::new();
914
915        // 1. Memory data + ralph-tools skill — special case with data loading
916        self.inject_memories_and_tools_skill(&mut prefix);
917
918        // 2. RObot interaction skill — gated by robot.enabled
919        self.inject_robot_skill(&mut prefix);
920
921        // 3. Other auto-inject skills from the registry
922        self.inject_custom_auto_skills(&mut prefix);
923
924        if prefix.is_empty() {
925            return prompt;
926        }
927
928        prefix.push_str("\n\n");
929        prefix.push_str(&prompt);
930        prefix
931    }
932
933    /// Injects memory data and the ralph-tools skill into the prefix.
934    ///
935    /// Special case: loads memory entries from the store, applies budget
936    /// truncation, then appends the ralph-tools skill content (which covers
937    /// both tasks and memories CLI usage).
938    /// Memory data is gated by `memories.enabled && memories.inject == Auto`.
939    /// The ralph-tools skill is injected when either memories or tasks are enabled.
940    fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
941        let memories_config = &self.config.memories;
942
943        // Inject memory DATA if memories are enabled with auto-inject
944        if memories_config.enabled && memories_config.inject == InjectMode::Auto {
945            info!(
946                "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
947                memories_config.enabled, memories_config.inject, self.config.core.workspace_root
948            );
949
950            let workspace_root = &self.config.core.workspace_root;
951            let store = MarkdownMemoryStore::with_default_path(workspace_root);
952            let memories_path = workspace_root.join(".ralph/agent/memories.md");
953
954            info!(
955                "Looking for memories at: {:?} (exists: {})",
956                memories_path,
957                memories_path.exists()
958            );
959
960            let memories = match store.load() {
961                Ok(memories) => {
962                    info!("Successfully loaded {} memories from store", memories.len());
963                    memories
964                }
965                Err(e) => {
966                    info!(
967                        "Failed to load memories for injection: {} (path: {:?})",
968                        e, memories_path
969                    );
970                    Vec::new()
971                }
972            };
973
974            if memories.is_empty() {
975                info!("Memory store is empty - no memories to inject");
976            } else {
977                let mut memories_content = format_memories_as_markdown(&memories);
978
979                if memories_config.budget > 0 {
980                    let original_len = memories_content.len();
981                    memories_content =
982                        truncate_to_budget(&memories_content, memories_config.budget);
983                    debug!(
984                        "Applied budget: {} chars -> {} chars (budget: {})",
985                        original_len,
986                        memories_content.len(),
987                        memories_config.budget
988                    );
989                }
990
991                info!(
992                    "Injecting {} memories ({} chars) into prompt",
993                    memories.len(),
994                    memories_content.len()
995                );
996
997                prefix.push_str(&memories_content);
998            }
999        }
1000
1001        // Inject the ralph-tools skill when either memories or tasks are enabled
1002        if memories_config.enabled || self.config.tasks.enabled {
1003            if let Some(skill) = self.skill_registry.get("ralph-tools") {
1004                if !prefix.is_empty() {
1005                    prefix.push_str("\n\n");
1006                }
1007                prefix.push_str(&format!(
1008                    "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1009                    skill.content.trim()
1010                ));
1011                debug!("Injected ralph-tools skill from registry");
1012            } else {
1013                debug!("ralph-tools skill not found in registry - skill content not injected");
1014            }
1015        }
1016    }
1017
1018    /// Injects the RObot interaction skill content into the prefix.
1019    ///
1020    /// Gated by `robot.enabled`. Teaches agents how and when to interact
1021    /// with humans via `interact.human` events.
1022    fn inject_robot_skill(&self, prefix: &mut String) {
1023        if !self.config.robot.enabled {
1024            return;
1025        }
1026
1027        if let Some(skill) = self.skill_registry.get("robot-interaction") {
1028            if !prefix.is_empty() {
1029                prefix.push_str("\n\n");
1030            }
1031            prefix.push_str(&format!(
1032                "<robot-skill>\n{}\n</robot-skill>",
1033                skill.content.trim()
1034            ));
1035            debug!("Injected robot interaction skill from registry");
1036        }
1037    }
1038
1039    /// Injects any user-configured auto-inject skills (excluding built-in ralph-tools/robot-interaction).
1040    fn inject_custom_auto_skills(&self, prefix: &mut String) {
1041        for skill in self.skill_registry.auto_inject_skills(None) {
1042            // Skip built-in skills handled above
1043            if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1044                continue;
1045            }
1046
1047            if !prefix.is_empty() {
1048                prefix.push_str("\n\n");
1049            }
1050            prefix.push_str(&format!(
1051                "<{name}-skill>\n{content}\n</{name}-skill>",
1052                name = skill.name,
1053                content = skill.content.trim()
1054            ));
1055            debug!("Injected auto-inject skill: {}", skill.name);
1056        }
1057    }
1058
1059    /// Prepends scratchpad content to the prompt if the file exists and is non-empty.
1060    ///
1061    /// The scratchpad is the agent's working memory for the current objective.
1062    /// Auto-injecting saves one tool call per iteration.
1063    /// When the file exceeds the budget, the TAIL is kept (most recent entries).
1064    fn prepend_scratchpad(&self, prompt: String) -> String {
1065        let scratchpad_path = self.scratchpad_path();
1066
1067        let resolved_path = if scratchpad_path.is_relative() {
1068            self.config.core.workspace_root.join(&scratchpad_path)
1069        } else {
1070            scratchpad_path
1071        };
1072
1073        if !resolved_path.exists() {
1074            debug!(
1075                "Scratchpad not found at {:?}, skipping injection",
1076                resolved_path
1077            );
1078            return prompt;
1079        }
1080
1081        let content = match std::fs::read_to_string(&resolved_path) {
1082            Ok(c) => c,
1083            Err(e) => {
1084                info!("Failed to read scratchpad for injection: {}", e);
1085                return prompt;
1086            }
1087        };
1088
1089        if content.trim().is_empty() {
1090            debug!("Scratchpad is empty, skipping injection");
1091            return prompt;
1092        }
1093
1094        // Budget: 4000 tokens ~16000 chars. Keep the TAIL (most recent content).
1095        let char_budget = 4000 * 4;
1096        let content = if content.len() > char_budget {
1097            // Find a line boundary near the start of the tail
1098            let start = content.len() - char_budget;
1099            let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1100            let discarded = &content[..line_start];
1101
1102            // Summarize discarded content by extracting markdown headings
1103            let headings: Vec<&str> = discarded
1104                .lines()
1105                .filter(|line| line.starts_with('#'))
1106                .collect();
1107            let summary = if headings.is_empty() {
1108                format!(
1109                    "<!-- earlier content truncated ({} chars omitted) -->",
1110                    line_start
1111                )
1112            } else {
1113                format!(
1114                    "<!-- earlier content truncated ({} chars omitted) -->\n\
1115                     <!-- discarded sections: {} -->",
1116                    line_start,
1117                    headings.join(" | ")
1118                )
1119            };
1120
1121            format!("{}\n\n{}", summary, &content[line_start..])
1122        } else {
1123            content
1124        };
1125
1126        info!("Injecting scratchpad ({} chars) into prompt", content.len());
1127
1128        let mut final_prompt = format!(
1129            "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1130            self.config.core.scratchpad, content
1131        );
1132        final_prompt.push_str(&prompt);
1133        final_prompt
1134    }
1135
1136    /// Prepends ready tasks to the prompt if tasks are enabled and any exist.
1137    ///
1138    /// Loads the task store and formats ready (unblocked, open) tasks into
1139    /// a `<ready-tasks>` XML block. This saves the agent a tool call per
1140    /// iteration and puts tasks at the same prominence as the scratchpad.
1141    fn prepend_ready_tasks(&self, prompt: String) -> String {
1142        if !self.config.tasks.enabled {
1143            return prompt;
1144        }
1145
1146        use crate::task::TaskStatus;
1147        use crate::task_store::TaskStore;
1148
1149        let tasks_path = self.tasks_path();
1150        let resolved_path = if tasks_path.is_relative() {
1151            self.config.core.workspace_root.join(&tasks_path)
1152        } else {
1153            tasks_path
1154        };
1155
1156        if !resolved_path.exists() {
1157            return prompt;
1158        }
1159
1160        let store = match TaskStore::load(&resolved_path) {
1161            Ok(s) => s,
1162            Err(e) => {
1163                info!("Failed to load task store for injection: {}", e);
1164                return prompt;
1165            }
1166        };
1167
1168        let ready = store.ready();
1169        let open = store.open();
1170        let closed_count = store.all().len() - open.len();
1171
1172        if open.is_empty() && closed_count == 0 {
1173            return prompt;
1174        }
1175
1176        let mut section = String::from("<ready-tasks>\n");
1177        if ready.is_empty() && open.is_empty() {
1178            section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1179        } else {
1180            section.push_str(&format!(
1181                "## Tasks: {} ready, {} open, {} closed\n\n",
1182                ready.len(),
1183                open.len(),
1184                closed_count
1185            ));
1186            for task in &ready {
1187                let status_icon = match task.status {
1188                    TaskStatus::Open => "[ ]",
1189                    TaskStatus::InProgress => "[~]",
1190                    _ => "[?]",
1191                };
1192                section.push_str(&format!(
1193                    "- {} [P{}] {} ({})\n",
1194                    status_icon, task.priority, task.title, task.id
1195                ));
1196            }
1197            // Show blocked tasks separately so agent knows they exist
1198            let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1199            let blocked: Vec<_> = open
1200                .iter()
1201                .filter(|t| !ready_ids.contains(&t.id.as_str()))
1202                .collect();
1203            if !blocked.is_empty() {
1204                section.push_str("\nBlocked:\n");
1205                for task in blocked {
1206                    section.push_str(&format!(
1207                        "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1208                        task.priority,
1209                        task.title,
1210                        task.id,
1211                        task.blocked_by.join(", ")
1212                    ));
1213                }
1214            }
1215        }
1216        section.push_str("</ready-tasks>\n\n");
1217
1218        info!(
1219            "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1220            ready.len(),
1221            open.len(),
1222            closed_count
1223        );
1224
1225        let mut final_prompt = section;
1226        final_prompt.push_str(&prompt);
1227        final_prompt
1228    }
1229
1230    /// Builds the Ralph prompt (coordination mode).
1231    pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1232        self.ralph.build_prompt(prompt_content, &[])
1233    }
1234
1235    /// Determines which hats should be active based on pending events.
1236    /// Returns list of Hat references that are triggered by any pending event.
1237    fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1238        let mut active_hats = Vec::new();
1239        for id in self.determine_active_hat_ids(events) {
1240            if let Some(hat) = self.registry.get(&id) {
1241                active_hats.push(hat);
1242            }
1243        }
1244        active_hats
1245    }
1246
1247    fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1248        let mut active_hat_ids = Vec::new();
1249        for event in events {
1250            if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1251                // Avoid duplicates
1252                if !active_hat_ids.iter().any(|id| id == &hat.id) {
1253                    active_hat_ids.push(hat.id.clone());
1254                }
1255            }
1256        }
1257        active_hat_ids
1258    }
1259
1260    /// Formats an event for prompt context.
1261    ///
1262    /// For top-level prompts (task.start, task.resume), wraps the payload in
1263    /// `<top-level-prompt>` XML tags to clearly delineate the user's original request.
1264    fn format_event(event: &Event) -> String {
1265        let topic = &event.topic;
1266        let payload = &event.payload;
1267
1268        if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1269            format!(
1270                "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1271                topic, payload
1272            )
1273        } else {
1274            format!("Event: {} - {}", topic, payload)
1275        }
1276    }
1277
1278    fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1279        let Some(config) = self.registry.get_config(hat_id) else {
1280            return (false, None);
1281        };
1282        let Some(max) = config.max_activations else {
1283            return (false, None);
1284        };
1285
1286        let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1287        if count < max {
1288            return (false, None);
1289        }
1290
1291        // Emit only once per hat per run (avoid flooding).
1292        let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1293
1294        if !should_emit {
1295            // Hat is already exhausted - drop pending events silently.
1296            return (true, None);
1297        }
1298
1299        let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1300        dropped_topics.sort();
1301
1302        let payload = format!(
1303            "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n  - {topics}",
1304            hat = hat_id.as_str(),
1305            max = max,
1306            count = count,
1307            topics = dropped_topics.join("\n  - ")
1308        );
1309
1310        warn!(
1311            hat = %hat_id.as_str(),
1312            max_activations = max,
1313            activations = count,
1314            "Hat exhausted (max_activations reached)"
1315        );
1316
1317        (
1318            true,
1319            Some(Event::new(
1320                format!("{}.exhausted", hat_id.as_str()),
1321                payload,
1322            )),
1323        )
1324    }
1325
1326    fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1327        for hat_id in active_hat_ids {
1328            *self
1329                .state
1330                .hat_activation_counts
1331                .entry(hat_id.clone())
1332                .or_insert(0) += 1;
1333        }
1334    }
1335
1336    /// Returns the primary active hat ID for display purposes.
1337    /// Returns the first active hat, or "ralph" if no specific hat is active.
1338    pub fn get_active_hat_id(&self) -> HatId {
1339        // Peek at pending events (don't consume them)
1340        for hat_id in self.bus.hat_ids() {
1341            let Some(events) = self.bus.peek_pending(hat_id) else {
1342                continue;
1343            };
1344            let Some(event) = events.first() else {
1345                continue;
1346            };
1347            if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1348                return active_hat.id.clone();
1349            }
1350        }
1351        HatId::new("ralph")
1352    }
1353
1354    /// Records the current event count before hat execution.
1355    ///
1356    /// Call this before executing a hat, then use `check_default_publishes`
1357    /// after execution to inject a fallback event if needed.
1358    pub fn record_event_count(&mut self) -> usize {
1359        self.event_reader
1360            .read_new_events()
1361            .map(|r| r.events.len())
1362            .unwrap_or(0)
1363    }
1364
1365    /// Checks if hat wrote any events, and injects default if configured.
1366    ///
1367    /// Call this after hat execution with the count from `record_event_count`.
1368    /// If no new events were written AND the hat has `default_publishes` configured,
1369    /// this will inject the default event automatically.
1370    pub fn check_default_publishes(&mut self, hat_id: &HatId, _events_before: usize) {
1371        let events_after = self
1372            .event_reader
1373            .read_new_events()
1374            .map(|r| r.events.len())
1375            .unwrap_or(0);
1376
1377        if events_after == 0
1378            && let Some(config) = self.registry.get_config(hat_id)
1379            && let Some(default_topic) = &config.default_publishes
1380        {
1381            // No new events written - inject default event
1382            let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1383
1384            debug!(
1385                hat = %hat_id.as_str(),
1386                topic = %default_topic,
1387                "No events written by hat, injecting default_publishes event"
1388            );
1389
1390            self.bus.publish(default_event);
1391        }
1392    }
1393
1394    /// Returns a mutable reference to the event bus for direct event publishing.
1395    ///
1396    /// This is primarily used for planning sessions to inject user responses
1397    /// as events into the orchestration loop.
1398    pub fn bus(&mut self) -> &mut EventBus {
1399        &mut self.bus
1400    }
1401
1402    /// Processes output from a hat execution.
1403    ///
1404    /// Returns the termination reason if the loop should stop.
1405    pub fn process_output(
1406        &mut self,
1407        hat_id: &HatId,
1408        output: &str,
1409        success: bool,
1410    ) -> Option<TerminationReason> {
1411        self.state.iteration += 1;
1412        self.state.last_hat = Some(hat_id.clone());
1413
1414        // Periodic Telegram check-in
1415        if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1416            && let Some(ref telegram_service) = self.telegram_service
1417        {
1418            let elapsed = self.state.elapsed();
1419            let interval = std::time::Duration::from_secs(interval_secs);
1420            let last = self
1421                .state
1422                .last_checkin_at
1423                .map(|t| t.elapsed())
1424                .unwrap_or(elapsed);
1425
1426            if last >= interval {
1427                let context = self.build_checkin_context(hat_id);
1428                match telegram_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1429                    Ok(_) => {
1430                        self.state.last_checkin_at = Some(std::time::Instant::now());
1431                        debug!(iteration = self.state.iteration, "Sent Telegram check-in");
1432                    }
1433                    Err(e) => {
1434                        warn!(error = %e, "Failed to send Telegram check-in");
1435                    }
1436                }
1437            }
1438        }
1439
1440        // Log iteration started
1441        self.diagnostics.log_orchestration(
1442            self.state.iteration,
1443            "loop",
1444            crate::diagnostics::OrchestrationEvent::IterationStarted,
1445        );
1446
1447        // Log hat selected
1448        self.diagnostics.log_orchestration(
1449            self.state.iteration,
1450            "loop",
1451            crate::diagnostics::OrchestrationEvent::HatSelected {
1452                hat: hat_id.to_string(),
1453                reason: "process_output".to_string(),
1454            },
1455        );
1456
1457        // Track failures
1458        if success {
1459            self.state.consecutive_failures = 0;
1460        } else {
1461            self.state.consecutive_failures += 1;
1462        }
1463
1464        // Check for completion promise - only valid from Ralph (the coordinator)
1465        // Trust the agent's decision to complete - it knows when the objective is done.
1466        // Open tasks are logged as a warning but do not block completion.
1467        if hat_id.as_str() == "ralph"
1468            && EventParser::contains_promise(output, &self.config.event_loop.completion_promise)
1469        {
1470            // Log warning if tasks remain open (informational only)
1471            if self.config.memories.enabled {
1472                if let Ok(false) = self.verify_tasks_complete() {
1473                    let open_tasks = self.get_open_task_list();
1474                    warn!(
1475                        open_tasks = ?open_tasks,
1476                        "LOOP_COMPLETE with {} open task(s) - trusting agent decision",
1477                        open_tasks.len()
1478                    );
1479                }
1480            } else if let Ok(false) = self.verify_scratchpad_complete() {
1481                warn!("LOOP_COMPLETE with pending scratchpad tasks - trusting agent decision");
1482            }
1483
1484            // Trust the agent - terminate immediately
1485            info!("LOOP_COMPLETE detected - terminating");
1486
1487            // Log loop terminated
1488            self.diagnostics.log_orchestration(
1489                self.state.iteration,
1490                "loop",
1491                crate::diagnostics::OrchestrationEvent::LoopTerminated {
1492                    reason: "completion_promise".to_string(),
1493                },
1494            );
1495
1496            return Some(TerminationReason::CompletionPromise);
1497        }
1498
1499        // Events are ONLY read from the JSONL file written by `ralph emit`.
1500        // This enforces tool use and prevents confabulation (agent claiming to emit without actually doing so).
1501        // See process_events_from_jsonl() for event processing.
1502
1503        // Check termination conditions
1504        self.check_termination()
1505    }
1506
1507    /// Extracts task identifier from build.blocked payload.
1508    /// Uses first line of payload as task ID.
1509    fn extract_task_id(payload: &str) -> String {
1510        payload
1511            .lines()
1512            .next()
1513            .unwrap_or("unknown")
1514            .trim()
1515            .to_string()
1516    }
1517
1518    /// Adds cost to the cumulative total.
1519    pub fn add_cost(&mut self, cost: f64) {
1520        self.state.cumulative_cost += cost;
1521    }
1522
1523    /// Verifies all tasks in scratchpad are complete or cancelled.
1524    ///
1525    /// Returns:
1526    /// - `Ok(true)` if all tasks are `[x]` or `[~]`
1527    /// - `Ok(false)` if any tasks are `[ ]` (pending)
1528    /// - `Err(...)` if scratchpad doesn't exist or can't be read
1529    fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1530        let scratchpad_path = self.scratchpad_path();
1531
1532        if !scratchpad_path.exists() {
1533            return Err(std::io::Error::new(
1534                std::io::ErrorKind::NotFound,
1535                "Scratchpad does not exist",
1536            ));
1537        }
1538
1539        let content = std::fs::read_to_string(scratchpad_path)?;
1540
1541        let has_pending = content
1542            .lines()
1543            .any(|line| line.trim_start().starts_with("- [ ]"));
1544
1545        Ok(!has_pending)
1546    }
1547
1548    fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1549        use crate::task_store::TaskStore;
1550
1551        let tasks_path = self.tasks_path();
1552
1553        // No tasks file = no pending tasks = complete
1554        if !tasks_path.exists() {
1555            return Ok(true);
1556        }
1557
1558        let store = TaskStore::load(&tasks_path)?;
1559        Ok(!store.has_pending_tasks())
1560    }
1561
1562    /// Builds a [`CheckinContext`] with current loop state for enhanced Telegram check-ins.
1563    fn build_checkin_context(&self, hat_id: &HatId) -> ralph_telegram::CheckinContext {
1564        let (open_tasks, closed_tasks) = self.count_tasks();
1565        ralph_telegram::CheckinContext {
1566            current_hat: Some(hat_id.as_str().to_string()),
1567            open_tasks,
1568            closed_tasks,
1569            cumulative_cost: self.state.cumulative_cost,
1570        }
1571    }
1572
1573    /// Counts open and closed tasks from the task store.
1574    ///
1575    /// Returns `(open_count, closed_count)`. "Open" means non-terminal tasks,
1576    /// "closed" means tasks with `TaskStatus::Closed`.
1577    fn count_tasks(&self) -> (usize, usize) {
1578        use crate::task::TaskStatus;
1579        use crate::task_store::TaskStore;
1580
1581        let tasks_path = self.tasks_path();
1582        if !tasks_path.exists() {
1583            return (0, 0);
1584        }
1585
1586        match TaskStore::load(&tasks_path) {
1587            Ok(store) => {
1588                let total = store.all().len();
1589                let open = store.open().len();
1590                let closed = total - open;
1591                // Verify: closed should match Closed status count
1592                debug_assert_eq!(
1593                    closed,
1594                    store
1595                        .all()
1596                        .iter()
1597                        .filter(|t| t.status == TaskStatus::Closed)
1598                        .count()
1599                );
1600                (open, closed)
1601            }
1602            Err(_) => (0, 0),
1603        }
1604    }
1605
1606    /// Returns a list of open task descriptions for logging purposes.
1607    fn get_open_task_list(&self) -> Vec<String> {
1608        use crate::task_store::TaskStore;
1609
1610        let tasks_path = self.tasks_path();
1611        if let Ok(store) = TaskStore::load(&tasks_path) {
1612            return store
1613                .open()
1614                .iter()
1615                .map(|t| format!("{}: {}", t.id, t.title))
1616                .collect();
1617        }
1618        vec![]
1619    }
1620
1621    /// Processes events from JSONL and routes orphaned events to Ralph.
1622    ///
1623    /// Also handles backpressure for malformed JSONL lines by:
1624    /// 1. Emitting `event.malformed` system events for each parse failure
1625    /// 2. Tracking consecutive failures for termination check
1626    /// 3. Resetting counter when valid events are parsed
1627    ///
1628    /// Returns true if Ralph should be invoked to handle orphaned events.
1629    pub fn process_events_from_jsonl(&mut self) -> std::io::Result<bool> {
1630        let result = self.event_reader.read_new_events()?;
1631
1632        // Handle malformed lines with backpressure
1633        for malformed in &result.malformed {
1634            let payload = format!(
1635                "Line {}: {}\nContent: {}",
1636                malformed.line_number, malformed.error, &malformed.content
1637            );
1638            let event = Event::new("event.malformed", &payload);
1639            self.bus.publish(event);
1640            self.state.consecutive_malformed_events += 1;
1641            warn!(
1642                line = malformed.line_number,
1643                consecutive = self.state.consecutive_malformed_events,
1644                "Malformed event line detected"
1645            );
1646        }
1647
1648        // Reset counter when valid events are parsed
1649        if !result.events.is_empty() {
1650            self.state.consecutive_malformed_events = 0;
1651        }
1652
1653        if result.events.is_empty() && result.malformed.is_empty() {
1654            return Ok(false);
1655        }
1656
1657        let mut has_orphans = false;
1658
1659        // Validate and transform events (apply backpressure for build.done)
1660        let mut validated_events = Vec::new();
1661        for event in result.events {
1662            let payload = event.payload.clone().unwrap_or_default();
1663
1664            if event.topic == "build.done" {
1665                // Validate build.done events have backpressure evidence
1666                if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1667                    if evidence.all_passed() {
1668                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1669                    } else {
1670                        // Evidence present but checks failed - synthesize build.blocked
1671                        warn!(
1672                            tests = evidence.tests_passed,
1673                            lint = evidence.lint_passed,
1674                            typecheck = evidence.typecheck_passed,
1675                            "build.done rejected: backpressure checks failed"
1676                        );
1677
1678                        self.diagnostics.log_orchestration(
1679                            self.state.iteration,
1680                            "jsonl",
1681                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1682                                reason: format!(
1683                                    "backpressure checks failed: tests={}, lint={}, typecheck={}",
1684                                    evidence.tests_passed,
1685                                    evidence.lint_passed,
1686                                    evidence.typecheck_passed
1687                                ),
1688                            },
1689                        );
1690
1691                        validated_events.push(Event::new(
1692                            "build.blocked",
1693                            "Backpressure checks failed. Fix tests/lint/typecheck before emitting build.done.",
1694                        ));
1695                    }
1696                } else {
1697                    // No evidence found - synthesize build.blocked
1698                    warn!("build.done rejected: missing backpressure evidence");
1699
1700                    self.diagnostics.log_orchestration(
1701                        self.state.iteration,
1702                        "jsonl",
1703                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1704                            reason: "missing backpressure evidence".to_string(),
1705                        },
1706                    );
1707
1708                    validated_events.push(Event::new(
1709                        "build.blocked",
1710                        "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass' in build.done payload.",
1711                    ));
1712                }
1713            } else if event.topic == "review.done" {
1714                // Validate review.done events have verification evidence
1715                if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
1716                    if evidence.is_verified() {
1717                        validated_events.push(Event::new(event.topic.as_str(), &payload));
1718                    } else {
1719                        // Evidence present but checks failed - synthesize review.blocked
1720                        warn!(
1721                            tests = evidence.tests_passed,
1722                            build = evidence.build_passed,
1723                            "review.done rejected: verification checks failed"
1724                        );
1725
1726                        self.diagnostics.log_orchestration(
1727                            self.state.iteration,
1728                            "jsonl",
1729                            crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1730                                reason: format!(
1731                                    "review verification failed: tests={}, build={}",
1732                                    evidence.tests_passed, evidence.build_passed
1733                                ),
1734                            },
1735                        );
1736
1737                        validated_events.push(Event::new(
1738                            "review.blocked",
1739                            "Review verification failed. Run tests and build before emitting review.done.",
1740                        ));
1741                    }
1742                } else {
1743                    // No evidence found - synthesize review.blocked
1744                    warn!("review.done rejected: missing verification evidence");
1745
1746                    self.diagnostics.log_orchestration(
1747                        self.state.iteration,
1748                        "jsonl",
1749                        crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1750                            reason: "missing review verification evidence".to_string(),
1751                        },
1752                    );
1753
1754                    validated_events.push(Event::new(
1755                        "review.blocked",
1756                        "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
1757                    ));
1758                }
1759            } else {
1760                // Non-build.done/review.done events pass through unchanged
1761                validated_events.push(Event::new(event.topic.as_str(), &payload));
1762            }
1763        }
1764
1765        // Track build.blocked events for thrashing detection
1766        let blocked_events: Vec<_> = validated_events
1767            .iter()
1768            .filter(|e| e.topic == "build.blocked".into())
1769            .collect();
1770
1771        for blocked_event in &blocked_events {
1772            let task_id = Self::extract_task_id(&blocked_event.payload);
1773
1774            let count = self
1775                .state
1776                .task_block_counts
1777                .entry(task_id.clone())
1778                .or_insert(0);
1779            *count += 1;
1780
1781            debug!(
1782                task_id = %task_id,
1783                block_count = *count,
1784                "Task blocked"
1785            );
1786
1787            // After 3 blocks on same task, emit build.task.abandoned
1788            if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
1789                warn!(
1790                    task_id = %task_id,
1791                    "Task abandoned after 3 consecutive blocks"
1792                );
1793
1794                self.state.abandoned_tasks.push(task_id.clone());
1795
1796                self.diagnostics.log_orchestration(
1797                    self.state.iteration,
1798                    "jsonl",
1799                    crate::diagnostics::OrchestrationEvent::TaskAbandoned {
1800                        reason: format!(
1801                            "3 consecutive build.blocked events for task '{}'",
1802                            task_id
1803                        ),
1804                    },
1805                );
1806
1807                let abandoned_event = Event::new(
1808                    "build.task.abandoned",
1809                    format!(
1810                        "Task '{}' abandoned after 3 consecutive build.blocked events",
1811                        task_id
1812                    ),
1813                );
1814
1815                self.bus.publish(abandoned_event);
1816            }
1817        }
1818
1819        // Track hat-level blocking for legacy thrashing detection
1820        let has_blocked_event = !blocked_events.is_empty();
1821
1822        if has_blocked_event {
1823            self.state.consecutive_blocked += 1;
1824        } else {
1825            self.state.consecutive_blocked = 0;
1826            self.state.last_blocked_hat = None;
1827        }
1828
1829        // Handle interact.human blocking behavior:
1830        // When an interact.human event is detected and Telegram service is active,
1831        // send the question and block until human.response or timeout.
1832        let mut response_event = None;
1833        let ask_human_idx = validated_events
1834            .iter()
1835            .position(|e| e.topic == "interact.human".into());
1836
1837        if let Some(idx) = ask_human_idx {
1838            let ask_event = &validated_events[idx];
1839            let payload = ask_event.payload.clone();
1840
1841            if let Some(ref telegram_service) = self.telegram_service {
1842                info!(
1843                    payload = %payload,
1844                    "interact.human event detected — sending question via Telegram"
1845                );
1846
1847                // Send the question (includes retry with exponential backoff)
1848                let send_ok = match telegram_service.send_question(&payload) {
1849                    Ok(_message_id) => true,
1850                    Err(e) => {
1851                        warn!(
1852                            error = %e,
1853                            "Failed to send interact.human question after retries — treating as timeout"
1854                        );
1855                        // Log to diagnostics
1856                        self.diagnostics.log_error(
1857                            self.state.iteration,
1858                            "telegram",
1859                            crate::diagnostics::DiagnosticError::TelegramSendError {
1860                                operation: "send_question".to_string(),
1861                                error: e.to_string(),
1862                                retry_count: ralph_telegram::MAX_SEND_RETRIES,
1863                            },
1864                        );
1865                        false
1866                    }
1867                };
1868
1869                // Block: poll events file for human.response
1870                // Per spec, even on send failure we treat as timeout (continue without blocking)
1871                if send_ok {
1872                    let events_path = self
1873                        .loop_context
1874                        .as_ref()
1875                        .map(|ctx| ctx.events_path())
1876                        .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"));
1877
1878                    match telegram_service.wait_for_response(&events_path) {
1879                        Ok(Some(response)) => {
1880                            info!(
1881                                response = %response,
1882                                "Received human.response — continuing loop"
1883                            );
1884                            // Create a human.response event to inject into the bus
1885                            response_event = Some(Event::new("human.response", &response));
1886                        }
1887                        Ok(None) => {
1888                            warn!(
1889                                timeout_secs = telegram_service.timeout_secs(),
1890                                "Human response timeout — continuing without response"
1891                            );
1892                        }
1893                        Err(e) => {
1894                            warn!(
1895                                error = %e,
1896                                "Error waiting for human response — continuing without response"
1897                            );
1898                        }
1899                    }
1900                }
1901            } else {
1902                debug!(
1903                    "interact.human event detected but no Telegram service active — passing through"
1904                );
1905            }
1906        }
1907
1908        // Publish validated events to the bus.
1909        // Ralph is always registered with subscribe("*"), so every event has at least
1910        // one subscriber. Events without a specific hat subscriber are "orphaned" —
1911        // Ralph handles them as the universal fallback.
1912        for event in validated_events {
1913            self.diagnostics.log_orchestration(
1914                self.state.iteration,
1915                "jsonl",
1916                crate::diagnostics::OrchestrationEvent::EventPublished {
1917                    topic: event.topic.to_string(),
1918                },
1919            );
1920
1921            if !self.registry.has_subscriber(event.topic.as_str()) {
1922                has_orphans = true;
1923            }
1924
1925            debug!(
1926                topic = %event.topic,
1927                "Publishing event from JSONL"
1928            );
1929            self.bus.publish(event);
1930        }
1931
1932        // Publish human.response event if one was received during blocking
1933        if let Some(response) = response_event {
1934            info!(
1935                topic = %response.topic,
1936                "Publishing human.response event from Telegram"
1937            );
1938            self.bus.publish(response);
1939        }
1940
1941        Ok(has_orphans)
1942    }
1943
1944    /// Checks if output contains LOOP_COMPLETE from Ralph.
1945    ///
1946    /// Only Ralph can trigger loop completion. Hat outputs are ignored.
1947    pub fn check_ralph_completion(&self, output: &str) -> bool {
1948        EventParser::contains_promise(output, &self.config.event_loop.completion_promise)
1949    }
1950
1951    /// Publishes the loop.terminate system event to observers.
1952    ///
1953    /// Per spec: "Published by the orchestrator (not agents) when the loop exits."
1954    /// This is an observer-only event—hats cannot trigger on it.
1955    ///
1956    /// Returns the event for logging purposes.
1957    pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
1958        // Stop the Telegram service if it was running
1959        self.stop_telegram_service();
1960
1961        let elapsed = self.state.elapsed();
1962        let duration_str = format_duration(elapsed);
1963
1964        let payload = format!(
1965            "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
1966            reason.as_str(),
1967            termination_status_text(reason),
1968            self.state.iteration,
1969            duration_str,
1970            reason.exit_code()
1971        );
1972
1973        let event = Event::new("loop.terminate", &payload);
1974
1975        // Publish to bus for observers (but no hat can trigger on this)
1976        self.bus.publish(event.clone());
1977
1978        info!(
1979            reason = %reason.as_str(),
1980            iterations = self.state.iteration,
1981            duration = %duration_str,
1982            "Wrapping up: {}. {} iterations in {}.",
1983            reason.as_str(),
1984            self.state.iteration,
1985            duration_str
1986        );
1987
1988        event
1989    }
1990
1991    /// Returns the Telegram service's shutdown flag, if active.
1992    ///
1993    /// Signal handlers can set this flag to interrupt `wait_for_response()`
1994    /// without waiting for the full timeout.
1995    pub fn telegram_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
1996        self.telegram_service.as_ref().map(|s| s.shutdown_flag())
1997    }
1998
1999    /// Returns a reference to the Telegram service, if active.
2000    pub fn telegram_service(&self) -> Option<&TelegramService> {
2001        self.telegram_service.as_ref()
2002    }
2003
2004    /// Returns a mutable reference to the Telegram service, if active.
2005    pub fn telegram_service_mut(&mut self) -> Option<&mut TelegramService> {
2006        self.telegram_service.as_mut()
2007    }
2008
2009    /// Stops the Telegram service if it's running.
2010    ///
2011    /// Called during loop termination to cleanly shut down the bot.
2012    fn stop_telegram_service(&mut self) {
2013        if let Some(service) = self.telegram_service.take() {
2014            service.stop();
2015        }
2016    }
2017
2018    // -------------------------------------------------------------------------
2019    // Human-in-the-loop planning support
2020    // -------------------------------------------------------------------------
2021
2022    /// Check if any event is a `user.prompt` event.
2023    ///
2024    /// Returns the first user prompt event found, or None.
2025    pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2026        events
2027            .iter()
2028            .find(|e| e.topic.as_str() == "user.prompt")
2029            .map(|e| UserPrompt {
2030                id: Self::extract_prompt_id(&e.payload),
2031                text: e.payload.clone(),
2032            })
2033    }
2034
2035    /// Extract a prompt ID from the event payload.
2036    ///
2037    /// Supports both XML attribute format: `<event topic="user.prompt" id="q1">...</event>`
2038    /// and JSON format in payload.
2039    fn extract_prompt_id(payload: &str) -> String {
2040        // Try to extract id attribute from XML-like format first
2041        if let Some(start) = payload.find("id=\"")
2042            && let Some(end) = payload[start + 4..].find('"')
2043        {
2044            return payload[start + 4..start + 4 + end].to_string();
2045        }
2046
2047        // Fallback: generate a simple ID based on timestamp
2048        format!("q{}", Self::generate_prompt_id())
2049    }
2050
2051    /// Generate a simple unique ID for prompts.
2052    /// Uses timestamp-based generation since uuid crate isn't available.
2053    fn generate_prompt_id() -> String {
2054        use std::time::{SystemTime, UNIX_EPOCH};
2055        let nanos = SystemTime::now()
2056            .duration_since(UNIX_EPOCH)
2057            .unwrap()
2058            .as_nanos();
2059        format!("{:x}", nanos % 0xFFFF_FFFF)
2060    }
2061}
2062
2063/// A user prompt that requires human input.
2064///
2065/// Created when the agent emits a `user.prompt` event during planning.
2066#[derive(Debug, Clone)]
2067pub struct UserPrompt {
2068    /// Unique identifier for this prompt (e.g., "q1", "q2")
2069    pub id: String,
2070    /// The prompt/question text
2071    pub text: String,
2072}
2073
2074/// Error that can occur while waiting for user response.
2075#[derive(Debug, thiserror::Error)]
2076pub enum UserPromptError {
2077    #[error("Timeout waiting for user response")]
2078    Timeout,
2079
2080    #[error("Interrupted while waiting for user response")]
2081    Interrupted,
2082
2083    #[error("I/O error: {0}")]
2084    Io(#[from] std::io::Error),
2085}
2086
2087/// Wait for a user response to a specific prompt (async version).
2088///
2089/// This function polls the conversation file for a matching response entry.
2090/// It's designed to be called from async code when a user.prompt event is detected.
2091///
2092/// # Arguments
2093///
2094/// * `conversation_path` - Path to the conversation JSONL file
2095/// * `prompt_id` - The ID of the prompt we're waiting for
2096/// * `timeout_secs` - Maximum time to wait in seconds
2097/// * `interrupt_rx` - Optional channel to check for interruption
2098///
2099/// # Returns
2100///
2101/// The user's response text if found within the timeout period.
2102#[allow(dead_code)]
2103pub async fn wait_for_user_response_async(
2104    conversation_path: &std::path::Path,
2105    prompt_id: &str,
2106    timeout_secs: u64,
2107    mut interrupt_rx: Option<&mut tokio::sync::watch::Receiver<bool>>,
2108) -> Result<String, UserPromptError> {
2109    use tokio::time::{Duration, sleep, timeout};
2110
2111    let poll_interval = Duration::from_millis(100);
2112
2113    let result = timeout(Duration::from_secs(timeout_secs), async {
2114        loop {
2115            // Check for interruption
2116            if let Some(rx) = &mut interrupt_rx
2117                && *rx.borrow()
2118            {
2119                return Err(UserPromptError::Interrupted);
2120            }
2121
2122            // Poll for response
2123            if let Some(response) = find_response_in_file(conversation_path, prompt_id)? {
2124                return Ok(response);
2125            }
2126
2127            // Wait before next poll
2128            sleep(poll_interval).await;
2129        }
2130    })
2131    .await;
2132
2133    match result {
2134        Ok(r) => r,
2135        Err(_) => Err(UserPromptError::Timeout),
2136    }
2137}
2138
2139/// Wait for a user response to a specific prompt (sync version).
2140///
2141/// This function polls the conversation file for a matching response entry.
2142/// It's designed to be called from the CLI layer when a user.prompt event is detected.
2143///
2144/// # Arguments
2145///
2146/// * `conversation_path` - Path to the conversation JSONL file
2147/// * `prompt_id` - The ID of the prompt we're waiting for
2148/// * `timeout_secs` - Maximum time to wait in seconds
2149///
2150/// # Returns
2151///
2152/// The user's response text if found within the timeout period.
2153#[allow(dead_code)]
2154pub fn wait_for_user_response(
2155    conversation_path: &std::path::Path,
2156    prompt_id: &str,
2157    timeout_secs: u64,
2158) -> Result<String, UserPromptError> {
2159    use std::thread;
2160    use std::time::{Duration, Instant};
2161
2162    let deadline = Instant::now() + Duration::from_secs(timeout_secs);
2163    let poll_interval = Duration::from_millis(100);
2164
2165    loop {
2166        // Check for timeout
2167        if Instant::now() >= deadline {
2168            return Err(UserPromptError::Timeout);
2169        }
2170
2171        // Poll for response
2172        if let Some(response) = find_response_in_file(conversation_path, prompt_id)? {
2173            return Ok(response);
2174        }
2175
2176        // Wait before next poll
2177        thread::sleep(poll_interval);
2178    }
2179}
2180
2181/// Search for a response to a specific prompt in the conversation file.
2182#[allow(dead_code)]
2183fn find_response_in_file(
2184    conversation_path: &std::path::Path,
2185    prompt_id: &str,
2186) -> Result<Option<String>, UserPromptError> {
2187    if !conversation_path.exists() {
2188        return Ok(None);
2189    }
2190
2191    let content = std::fs::read_to_string(conversation_path)?;
2192
2193    for line in content.lines() {
2194        if let Ok(entry) = serde_json::from_str::<crate::planning_session::ConversationEntry>(line)
2195            && entry.entry_type == crate::planning_session::ConversationType::UserResponse
2196            && entry.id == prompt_id
2197        {
2198            return Ok(Some(entry.text));
2199        }
2200    }
2201
2202    Ok(None)
2203}
2204
2205/// Formats a duration as human-readable string.
2206fn format_duration(d: Duration) -> String {
2207    let total_secs = d.as_secs();
2208    let hours = total_secs / 3600;
2209    let minutes = (total_secs % 3600) / 60;
2210    let seconds = total_secs % 60;
2211
2212    if hours > 0 {
2213        format!("{}h {}m {}s", hours, minutes, seconds)
2214    } else if minutes > 0 {
2215        format!("{}m {}s", minutes, seconds)
2216    } else {
2217        format!("{}s", seconds)
2218    }
2219}
2220
2221/// Returns a human-readable status based on termination reason.
2222fn termination_status_text(reason: &TerminationReason) -> &'static str {
2223    match reason {
2224        TerminationReason::CompletionPromise => "All tasks completed successfully.",
2225        TerminationReason::MaxIterations => "Stopped at iteration limit.",
2226        TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2227        TerminationReason::MaxCost => "Stopped at cost limit.",
2228        TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2229        TerminationReason::LoopThrashing => {
2230            "Loop thrashing detected - same hat repeatedly blocked."
2231        }
2232        TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2233        TerminationReason::Stopped => "Manually stopped.",
2234        TerminationReason::Interrupted => "Interrupted by signal.",
2235        TerminationReason::ChaosModeComplete => "Chaos mode exploration complete.",
2236        TerminationReason::ChaosModeMaxIterations => "Chaos mode stopped at iteration limit.",
2237        TerminationReason::RestartRequested => "Restarting by human request.",
2238    }
2239}