1mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::EventParser;
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use ralph_proto::{Event, EventBus, Hat, HatId};
20use std::path::PathBuf;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24const MEMORIES_SKILL: &str = include_str!("../../data/memories-skill.md");
30
31#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum TerminationReason {
34 CompletionPromise,
36 MaxIterations,
38 MaxRuntime,
40 MaxCost,
42 ConsecutiveFailures,
44 LoopThrashing,
46 ValidationFailure,
48 Stopped,
50 Interrupted,
52 ChaosModeComplete,
54 ChaosModeMaxIterations,
56}
57
58impl TerminationReason {
59 pub fn exit_code(&self) -> i32 {
67 match self {
68 TerminationReason::CompletionPromise | TerminationReason::ChaosModeComplete => 0,
69 TerminationReason::ConsecutiveFailures
70 | TerminationReason::LoopThrashing
71 | TerminationReason::ValidationFailure
72 | TerminationReason::Stopped => 1,
73 TerminationReason::MaxIterations
74 | TerminationReason::MaxRuntime
75 | TerminationReason::MaxCost
76 | TerminationReason::ChaosModeMaxIterations => 2,
77 TerminationReason::Interrupted => 130,
78 }
79 }
80
81 pub fn as_str(&self) -> &'static str {
86 match self {
87 TerminationReason::CompletionPromise => "completed",
88 TerminationReason::MaxIterations => "max_iterations",
89 TerminationReason::MaxRuntime => "max_runtime",
90 TerminationReason::MaxCost => "max_cost",
91 TerminationReason::ConsecutiveFailures => "consecutive_failures",
92 TerminationReason::LoopThrashing => "loop_thrashing",
93 TerminationReason::ValidationFailure => "validation_failure",
94 TerminationReason::Stopped => "stopped",
95 TerminationReason::Interrupted => "interrupted",
96 TerminationReason::ChaosModeComplete => "chaos_complete",
97 TerminationReason::ChaosModeMaxIterations => "chaos_max_iterations",
98 }
99 }
100
101 pub fn is_success(&self) -> bool {
103 matches!(
104 self,
105 TerminationReason::CompletionPromise | TerminationReason::ChaosModeComplete
106 )
107 }
108
109 pub fn triggers_chaos_mode(&self) -> bool {
113 matches!(self, TerminationReason::CompletionPromise)
114 }
115}
116
117pub struct EventLoop {
119 config: RalphConfig,
120 registry: HatRegistry,
121 bus: EventBus,
122 state: LoopState,
123 instruction_builder: InstructionBuilder,
124 ralph: HatlessRalph,
125 pub(crate) event_reader: EventReader,
128 diagnostics: crate::diagnostics::DiagnosticsCollector,
129 loop_context: Option<LoopContext>,
131}
132
133impl EventLoop {
134 pub fn new(config: RalphConfig) -> Self {
136 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
139 .unwrap_or_else(|e| {
140 debug!(
141 "Failed to initialize diagnostics: {}, using disabled collector",
142 e
143 );
144 crate::diagnostics::DiagnosticsCollector::disabled()
145 });
146
147 Self::with_diagnostics(config, diagnostics)
148 }
149
150 pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
156 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
157 .unwrap_or_else(|e| {
158 debug!(
159 "Failed to initialize diagnostics: {}, using disabled collector",
160 e
161 );
162 crate::diagnostics::DiagnosticsCollector::disabled()
163 });
164
165 Self::with_context_and_diagnostics(config, context, diagnostics)
166 }
167
168 pub fn with_context_and_diagnostics(
170 config: RalphConfig,
171 context: LoopContext,
172 diagnostics: crate::diagnostics::DiagnosticsCollector,
173 ) -> Self {
174 let registry = HatRegistry::from_config(&config);
175 let instruction_builder = InstructionBuilder::with_events(
176 &config.event_loop.completion_promise,
177 config.core.clone(),
178 config.events.clone(),
179 );
180
181 let mut bus = EventBus::new();
182
183 for hat in registry.all() {
187 bus.register(hat.clone());
188 }
189
190 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
194
195 if registry.is_empty() {
196 debug!("Solo mode: Ralph is the only coordinator");
197 } else {
198 debug!(
199 "Multi-hat mode: {} custom hats + Ralph as fallback",
200 registry.len()
201 );
202 }
203
204 let ralph = HatlessRalph::new(
206 config.event_loop.completion_promise.clone(),
207 config.core.clone(),
208 ®istry,
209 config.event_loop.starting_event.clone(),
210 )
211 .with_memories_enabled(config.memories.enabled);
212
213 let events_path = std::fs::read_to_string(context.current_events_marker())
217 .map(|s| {
218 let relative = s.trim();
219 context.workspace().join(relative)
220 })
221 .unwrap_or_else(|_| context.events_path());
222 let event_reader = EventReader::new(&events_path);
223
224 Self {
225 config,
226 registry,
227 bus,
228 state: LoopState::new(),
229 instruction_builder,
230 ralph,
231 event_reader,
232 diagnostics,
233 loop_context: Some(context),
234 }
235 }
236
237 pub fn with_diagnostics(
239 config: RalphConfig,
240 diagnostics: crate::diagnostics::DiagnosticsCollector,
241 ) -> Self {
242 let registry = HatRegistry::from_config(&config);
243 let instruction_builder = InstructionBuilder::with_events(
244 &config.event_loop.completion_promise,
245 config.core.clone(),
246 config.events.clone(),
247 );
248
249 let mut bus = EventBus::new();
250
251 for hat in registry.all() {
255 bus.register(hat.clone());
256 }
257
258 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
262
263 if registry.is_empty() {
264 debug!("Solo mode: Ralph is the only coordinator");
265 } else {
266 debug!(
267 "Multi-hat mode: {} custom hats + Ralph as fallback",
268 registry.len()
269 );
270 }
271
272 let ralph = HatlessRalph::new(
274 config.event_loop.completion_promise.clone(),
275 config.core.clone(),
276 ®istry,
277 config.event_loop.starting_event.clone(),
278 )
279 .with_memories_enabled(config.memories.enabled);
280
281 let events_path = std::fs::read_to_string(".ralph/current-events")
284 .map(|s| s.trim().to_string())
285 .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
286 let event_reader = EventReader::new(&events_path);
287
288 Self {
289 config,
290 registry,
291 bus,
292 state: LoopState::new(),
293 instruction_builder,
294 ralph,
295 event_reader,
296 diagnostics,
297 loop_context: None,
298 }
299 }
300
301 pub fn loop_context(&self) -> Option<&LoopContext> {
303 self.loop_context.as_ref()
304 }
305
306 fn tasks_path(&self) -> PathBuf {
308 self.loop_context
309 .as_ref()
310 .map(|ctx| ctx.tasks_path())
311 .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
312 }
313
314 fn scratchpad_path(&self) -> PathBuf {
316 self.loop_context
317 .as_ref()
318 .map(|ctx| ctx.scratchpad_path())
319 .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
320 }
321
322 pub fn state(&self) -> &LoopState {
324 &self.state
325 }
326
327 pub fn config(&self) -> &RalphConfig {
329 &self.config
330 }
331
332 pub fn registry(&self) -> &HatRegistry {
334 &self.registry
335 }
336
337 pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
342 self.registry
343 .get_config(hat_id)
344 .and_then(|config| config.backend.as_ref())
345 }
346
347 pub fn add_observer<F>(&mut self, observer: F)
352 where
353 F: Fn(&Event) + Send + 'static,
354 {
355 self.bus.add_observer(observer);
356 }
357
358 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
362 pub fn set_observer<F>(&mut self, observer: F)
363 where
364 F: Fn(&Event) + Send + 'static,
365 {
366 #[allow(deprecated)]
367 self.bus.set_observer(observer);
368 }
369
370 pub fn check_termination(&self) -> Option<TerminationReason> {
372 let cfg = &self.config.event_loop;
373
374 if self.state.iteration >= cfg.max_iterations {
375 return Some(TerminationReason::MaxIterations);
376 }
377
378 if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
379 return Some(TerminationReason::MaxRuntime);
380 }
381
382 if let Some(max_cost) = cfg.max_cost_usd
383 && self.state.cumulative_cost >= max_cost
384 {
385 return Some(TerminationReason::MaxCost);
386 }
387
388 if self.state.consecutive_failures >= cfg.max_consecutive_failures {
389 return Some(TerminationReason::ConsecutiveFailures);
390 }
391
392 if self.state.abandoned_task_redispatches >= 3 {
394 return Some(TerminationReason::LoopThrashing);
395 }
396
397 if self.state.consecutive_malformed_events >= 3 {
399 return Some(TerminationReason::ValidationFailure);
400 }
401
402 None
403 }
404
405 pub fn initialize(&mut self, prompt_content: &str) {
407 let topic = self
409 .config
410 .event_loop
411 .starting_event
412 .clone()
413 .unwrap_or_else(|| "task.start".to_string());
414 self.initialize_with_topic(&topic, prompt_content);
415 }
416
417 pub fn initialize_resume(&mut self, prompt_content: &str) {
422 self.initialize_with_topic("task.resume", prompt_content);
424 }
425
426 fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
428 self.ralph.set_objective(prompt_content.to_string());
432
433 let start_event = Event::new(topic, prompt_content);
434 self.bus.publish(start_event);
435 debug!(topic = topic, "Published {} event", topic);
436 }
437
438 pub fn next_hat(&self) -> Option<&HatId> {
447 let next = self.bus.next_hat_with_pending();
448
449 next.as_ref()?;
451
452 if self.registry.is_empty() {
455 next
457 } else {
458 self.bus.hat_ids().find(|id| id.as_str() == "ralph")
461 }
462 }
463
464 pub fn has_pending_events(&self) -> bool {
469 self.bus.next_hat_with_pending().is_some()
470 }
471
472 pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
476 self.registry
477 .get(hat_id)
478 .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
479 .unwrap_or_default()
480 }
481
482 pub fn inject_fallback_event(&mut self) -> bool {
489 let fallback_event = Event::new(
490 "task.resume",
491 "RECOVERY: Previous iteration did not publish an event. \
492 Review the scratchpad and either dispatch the next task or complete the loop.",
493 );
494
495 let fallback_event = match &self.state.last_hat {
498 Some(hat_id) if hat_id.as_str() != "ralph" => {
499 debug!(
500 hat = %hat_id.as_str(),
501 "Injecting fallback event to recover - targeting last hat with task.resume"
502 );
503 fallback_event.with_target(hat_id.clone())
504 }
505 _ => {
506 debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
507 fallback_event
508 }
509 };
510
511 self.bus.publish(fallback_event);
512 true
513 }
514
515 pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
528 if hat_id.as_str() == "ralph" {
531 if self.registry.is_empty() {
532 let events = self.bus.take_pending(&hat_id.clone());
534 let events_context = events
535 .iter()
536 .map(|e| Self::format_event(e))
537 .collect::<Vec<_>>()
538 .join("\n");
539
540 let base_prompt = self.ralph.build_prompt(&events_context, &[]);
542 let final_prompt = self.prepend_memories(base_prompt);
543
544 debug!("build_prompt: routing to HatlessRalph (solo mode)");
545 return Some(final_prompt);
546 } else {
547 let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
549 all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
551
552 let mut all_events = Vec::new();
553 let mut system_events = Vec::new();
554
555 for id in &all_hat_ids {
556 let pending = self.bus.take_pending(id);
557 if pending.is_empty() {
558 continue;
559 }
560
561 let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
562 if drop_pending {
563 if let Some(exhausted_event) = exhausted_event {
565 all_events.push(exhausted_event.clone());
566 system_events.push(exhausted_event);
567 }
568 continue;
569 }
570
571 all_events.extend(pending);
572 }
573
574 for event in system_events {
577 self.bus.publish(event);
578 }
579
580 let active_hat_ids = self.determine_active_hat_ids(&all_events);
582 self.record_hat_activations(&active_hat_ids);
583 let active_hats = self.determine_active_hats(&all_events);
584
585 let events_context = all_events
587 .iter()
588 .map(|e| Self::format_event(e))
589 .collect::<Vec<_>>()
590 .join("\n");
591
592 let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
594 let final_prompt = self.prepend_memories(base_prompt);
595
596 debug!(
598 "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
599 active_hats
600 .iter()
601 .map(|h| h.id.as_str())
602 .collect::<Vec<_>>()
603 );
604 return Some(final_prompt);
605 }
606 }
607
608 let events = self.bus.take_pending(&hat_id.clone());
612 let events_context = events
613 .iter()
614 .map(|e| Self::format_event(e))
615 .collect::<Vec<_>>()
616 .join("\n");
617
618 let hat = self.registry.get(hat_id)?;
619
620 debug!(
622 "build_prompt: hat_id='{}', instructions.is_empty()={}",
623 hat_id.as_str(),
624 hat.instructions.is_empty()
625 );
626
627 debug!(
629 "build_prompt: routing to build_custom_hat() for '{}'",
630 hat_id.as_str()
631 );
632 Some(
633 self.instruction_builder
634 .build_custom_hat(hat, &events_context),
635 )
636 }
637
638 fn prepend_memories(&self, prompt: String) -> String {
643 let memories_config = &self.config.memories;
644
645 info!(
646 "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
647 memories_config.enabled, memories_config.inject, self.config.core.workspace_root
648 );
649
650 if !memories_config.enabled || memories_config.inject != InjectMode::Auto {
652 info!(
653 "Memory injection skipped: enabled={}, inject={:?}",
654 memories_config.enabled, memories_config.inject
655 );
656 return prompt;
657 }
658
659 let workspace_root = &self.config.core.workspace_root;
661 let store = MarkdownMemoryStore::with_default_path(workspace_root);
662 let memories_path = workspace_root.join(".ralph/agent/memories.md");
663
664 info!(
665 "Looking for memories at: {:?} (exists: {})",
666 memories_path,
667 memories_path.exists()
668 );
669
670 let memories = match store.load() {
671 Ok(memories) => {
672 info!("Successfully loaded {} memories from store", memories.len());
673 memories
674 }
675 Err(e) => {
676 info!(
677 "Failed to load memories for injection: {} (path: {:?})",
678 e, memories_path
679 );
680 return prompt;
681 }
682 };
683
684 if memories.is_empty() {
685 info!("Memory store is empty - no memories to inject");
686 return prompt;
687 }
688
689 let mut memories_content = format_memories_as_markdown(&memories);
691
692 if memories_config.budget > 0 {
694 let original_len = memories_content.len();
695 memories_content = truncate_to_budget(&memories_content, memories_config.budget);
696 debug!(
697 "Applied budget: {} chars -> {} chars (budget: {})",
698 original_len,
699 memories_content.len(),
700 memories_config.budget
701 );
702 }
703
704 info!(
705 "Injecting {} memories ({} chars) into prompt",
706 memories.len(),
707 memories_content.len()
708 );
709
710 let mut final_prompt = memories_content;
712
713 final_prompt.push_str(MEMORIES_SKILL);
715 debug!("Added memory usage skill to prompt");
716
717 final_prompt.push_str("\n\n");
718 final_prompt.push_str(&prompt);
719
720 final_prompt
721 }
722
723 pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
725 self.ralph.build_prompt(prompt_content, &[])
726 }
727
728 fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
731 let mut active_hats = Vec::new();
732 for id in self.determine_active_hat_ids(events) {
733 if let Some(hat) = self.registry.get(&id) {
734 active_hats.push(hat);
735 }
736 }
737 active_hats
738 }
739
740 fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
741 let mut active_hat_ids = Vec::new();
742 for event in events {
743 if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
744 if !active_hat_ids.iter().any(|id| id == &hat.id) {
746 active_hat_ids.push(hat.id.clone());
747 }
748 }
749 }
750 active_hat_ids
751 }
752
753 fn format_event(event: &Event) -> String {
758 let topic = &event.topic;
759 let payload = &event.payload;
760
761 if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
762 format!(
763 "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
764 topic, payload
765 )
766 } else {
767 format!("Event: {} - {}", topic, payload)
768 }
769 }
770
771 fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
772 let Some(config) = self.registry.get_config(hat_id) else {
773 return (false, None);
774 };
775 let Some(max) = config.max_activations else {
776 return (false, None);
777 };
778
779 let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
780 if count < max {
781 return (false, None);
782 }
783
784 let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
786
787 if !should_emit {
788 return (true, None);
790 }
791
792 let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
793 dropped_topics.sort();
794
795 let payload = format!(
796 "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n - {topics}",
797 hat = hat_id.as_str(),
798 max = max,
799 count = count,
800 topics = dropped_topics.join("\n - ")
801 );
802
803 warn!(
804 hat = %hat_id.as_str(),
805 max_activations = max,
806 activations = count,
807 "Hat exhausted (max_activations reached)"
808 );
809
810 (
811 true,
812 Some(Event::new(
813 format!("{}.exhausted", hat_id.as_str()),
814 payload,
815 )),
816 )
817 }
818
819 fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
820 for hat_id in active_hat_ids {
821 *self
822 .state
823 .hat_activation_counts
824 .entry(hat_id.clone())
825 .or_insert(0) += 1;
826 }
827 }
828
829 pub fn get_active_hat_id(&self) -> HatId {
832 for hat_id in self.bus.hat_ids() {
834 let Some(events) = self.bus.peek_pending(hat_id) else {
835 continue;
836 };
837 let Some(event) = events.first() else {
838 continue;
839 };
840 if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
841 return active_hat.id.clone();
842 }
843 }
844 HatId::new("ralph")
845 }
846
847 pub fn record_event_count(&mut self) -> usize {
852 self.event_reader
853 .read_new_events()
854 .map(|r| r.events.len())
855 .unwrap_or(0)
856 }
857
858 pub fn check_default_publishes(&mut self, hat_id: &HatId, _events_before: usize) {
864 let events_after = self
865 .event_reader
866 .read_new_events()
867 .map(|r| r.events.len())
868 .unwrap_or(0);
869
870 if events_after == 0
871 && let Some(config) = self.registry.get_config(hat_id)
872 && let Some(default_topic) = &config.default_publishes
873 {
874 let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
876
877 debug!(
878 hat = %hat_id.as_str(),
879 topic = %default_topic,
880 "No events written by hat, injecting default_publishes event"
881 );
882
883 self.bus.publish(default_event);
884 }
885 }
886
887 pub fn bus(&mut self) -> &mut EventBus {
892 &mut self.bus
893 }
894
895 pub fn process_output(
899 &mut self,
900 hat_id: &HatId,
901 output: &str,
902 success: bool,
903 ) -> Option<TerminationReason> {
904 self.state.iteration += 1;
905 self.state.last_hat = Some(hat_id.clone());
906
907 self.diagnostics.log_orchestration(
909 self.state.iteration,
910 "loop",
911 crate::diagnostics::OrchestrationEvent::IterationStarted,
912 );
913
914 self.diagnostics.log_orchestration(
916 self.state.iteration,
917 "loop",
918 crate::diagnostics::OrchestrationEvent::HatSelected {
919 hat: hat_id.to_string(),
920 reason: "process_output".to_string(),
921 },
922 );
923
924 if success {
926 self.state.consecutive_failures = 0;
927 } else {
928 self.state.consecutive_failures += 1;
929 }
930
931 if hat_id.as_str() == "ralph"
935 && EventParser::contains_promise(output, &self.config.event_loop.completion_promise)
936 {
937 if self.config.memories.enabled {
939 if let Ok(false) = self.verify_tasks_complete() {
940 let open_tasks = self.get_open_task_list();
941 warn!(
942 open_tasks = ?open_tasks,
943 "LOOP_COMPLETE with {} open task(s) - trusting agent decision",
944 open_tasks.len()
945 );
946 }
947 } else if let Ok(false) = self.verify_scratchpad_complete() {
948 warn!("LOOP_COMPLETE with pending scratchpad tasks - trusting agent decision");
949 }
950
951 info!("LOOP_COMPLETE detected - terminating");
953
954 self.diagnostics.log_orchestration(
956 self.state.iteration,
957 "loop",
958 crate::diagnostics::OrchestrationEvent::LoopTerminated {
959 reason: "completion_promise".to_string(),
960 },
961 );
962
963 return Some(TerminationReason::CompletionPromise);
964 }
965
966 self.check_termination()
972 }
973
974 fn extract_task_id(payload: &str) -> String {
977 payload
978 .lines()
979 .next()
980 .unwrap_or("unknown")
981 .trim()
982 .to_string()
983 }
984
985 pub fn add_cost(&mut self, cost: f64) {
987 self.state.cumulative_cost += cost;
988 }
989
990 fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
997 let scratchpad_path = self.scratchpad_path();
998
999 if !scratchpad_path.exists() {
1000 return Err(std::io::Error::new(
1001 std::io::ErrorKind::NotFound,
1002 "Scratchpad does not exist",
1003 ));
1004 }
1005
1006 let content = std::fs::read_to_string(scratchpad_path)?;
1007
1008 let has_pending = content
1009 .lines()
1010 .any(|line| line.trim_start().starts_with("- [ ]"));
1011
1012 Ok(!has_pending)
1013 }
1014
1015 fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1016 use crate::task_store::TaskStore;
1017
1018 let tasks_path = self.tasks_path();
1019
1020 if !tasks_path.exists() {
1022 return Ok(true);
1023 }
1024
1025 let store = TaskStore::load(&tasks_path)?;
1026 Ok(!store.has_pending_tasks())
1027 }
1028
1029 fn get_open_task_list(&self) -> Vec<String> {
1031 use crate::task_store::TaskStore;
1032
1033 let tasks_path = self.tasks_path();
1034 if let Ok(store) = TaskStore::load(&tasks_path) {
1035 return store
1036 .open()
1037 .iter()
1038 .map(|t| format!("{}: {}", t.id, t.title))
1039 .collect();
1040 }
1041 vec![]
1042 }
1043
1044 pub fn process_events_from_jsonl(&mut self) -> std::io::Result<bool> {
1053 let result = self.event_reader.read_new_events()?;
1054
1055 for malformed in &result.malformed {
1057 let payload = format!(
1058 "Line {}: {}\nContent: {}",
1059 malformed.line_number, malformed.error, &malformed.content
1060 );
1061 let event = Event::new("event.malformed", &payload);
1062 self.bus.publish(event);
1063 self.state.consecutive_malformed_events += 1;
1064 warn!(
1065 line = malformed.line_number,
1066 consecutive = self.state.consecutive_malformed_events,
1067 "Malformed event line detected"
1068 );
1069 }
1070
1071 if !result.events.is_empty() {
1073 self.state.consecutive_malformed_events = 0;
1074 }
1075
1076 if result.events.is_empty() && result.malformed.is_empty() {
1077 return Ok(false);
1078 }
1079
1080 let mut has_orphans = false;
1081
1082 let mut validated_events = Vec::new();
1084 for event in result.events {
1085 let payload = event.payload.clone().unwrap_or_default();
1086
1087 if event.topic == "build.done" {
1088 if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1090 if evidence.all_passed() {
1091 validated_events.push(Event::new(event.topic.as_str(), &payload));
1092 } else {
1093 warn!(
1095 tests = evidence.tests_passed,
1096 lint = evidence.lint_passed,
1097 typecheck = evidence.typecheck_passed,
1098 "build.done rejected: backpressure checks failed"
1099 );
1100
1101 self.diagnostics.log_orchestration(
1102 self.state.iteration,
1103 "jsonl",
1104 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1105 reason: format!(
1106 "backpressure checks failed: tests={}, lint={}, typecheck={}",
1107 evidence.tests_passed,
1108 evidence.lint_passed,
1109 evidence.typecheck_passed
1110 ),
1111 },
1112 );
1113
1114 validated_events.push(Event::new(
1115 "build.blocked",
1116 "Backpressure checks failed. Fix tests/lint/typecheck before emitting build.done.",
1117 ));
1118 }
1119 } else {
1120 warn!("build.done rejected: missing backpressure evidence");
1122
1123 self.diagnostics.log_orchestration(
1124 self.state.iteration,
1125 "jsonl",
1126 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1127 reason: "missing backpressure evidence".to_string(),
1128 },
1129 );
1130
1131 validated_events.push(Event::new(
1132 "build.blocked",
1133 "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass' in build.done payload.",
1134 ));
1135 }
1136 } else {
1137 validated_events.push(Event::new(event.topic.as_str(), &payload));
1139 }
1140 }
1141
1142 let blocked_events: Vec<_> = validated_events
1144 .iter()
1145 .filter(|e| e.topic == "build.blocked".into())
1146 .collect();
1147
1148 for blocked_event in &blocked_events {
1149 let task_id = Self::extract_task_id(&blocked_event.payload);
1150
1151 let count = self
1152 .state
1153 .task_block_counts
1154 .entry(task_id.clone())
1155 .or_insert(0);
1156 *count += 1;
1157
1158 debug!(
1159 task_id = %task_id,
1160 block_count = *count,
1161 "Task blocked"
1162 );
1163
1164 if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
1166 warn!(
1167 task_id = %task_id,
1168 "Task abandoned after 3 consecutive blocks"
1169 );
1170
1171 self.state.abandoned_tasks.push(task_id.clone());
1172
1173 self.diagnostics.log_orchestration(
1174 self.state.iteration,
1175 "jsonl",
1176 crate::diagnostics::OrchestrationEvent::TaskAbandoned {
1177 reason: format!(
1178 "3 consecutive build.blocked events for task '{}'",
1179 task_id
1180 ),
1181 },
1182 );
1183
1184 let abandoned_event = Event::new(
1185 "build.task.abandoned",
1186 format!(
1187 "Task '{}' abandoned after 3 consecutive build.blocked events",
1188 task_id
1189 ),
1190 );
1191
1192 self.bus.publish(abandoned_event);
1193 }
1194 }
1195
1196 let has_blocked_event = !blocked_events.is_empty();
1198
1199 if has_blocked_event {
1200 self.state.consecutive_blocked += 1;
1201 } else {
1202 self.state.consecutive_blocked = 0;
1203 self.state.last_blocked_hat = None;
1204 }
1205
1206 for event in validated_events {
1208 self.diagnostics.log_orchestration(
1210 self.state.iteration,
1211 "jsonl",
1212 crate::diagnostics::OrchestrationEvent::EventPublished {
1213 topic: event.topic.to_string(),
1214 },
1215 );
1216
1217 if self.registry.has_subscriber(event.topic.as_str()) {
1219 debug!(
1220 topic = %event.topic,
1221 "Publishing event from JSONL"
1222 );
1223 self.bus.publish(event);
1224 } else {
1225 debug!(
1227 topic = %event.topic,
1228 "Event has no subscriber - will be handled by Ralph"
1229 );
1230 has_orphans = true;
1231 }
1232 }
1233
1234 Ok(has_orphans)
1235 }
1236
1237 pub fn check_ralph_completion(&self, output: &str) -> bool {
1241 EventParser::contains_promise(output, &self.config.event_loop.completion_promise)
1242 }
1243
1244 pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
1251 let elapsed = self.state.elapsed();
1252 let duration_str = format_duration(elapsed);
1253
1254 let payload = format!(
1255 "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
1256 reason.as_str(),
1257 termination_status_text(reason),
1258 self.state.iteration,
1259 duration_str,
1260 reason.exit_code()
1261 );
1262
1263 let event = Event::new("loop.terminate", &payload);
1264
1265 self.bus.publish(event.clone());
1267
1268 info!(
1269 reason = %reason.as_str(),
1270 iterations = self.state.iteration,
1271 duration = %duration_str,
1272 "Wrapping up: {}. {} iterations in {}.",
1273 reason.as_str(),
1274 self.state.iteration,
1275 duration_str
1276 );
1277
1278 event
1279 }
1280
1281 pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
1289 events
1290 .iter()
1291 .find(|e| e.topic.as_str() == "user.prompt")
1292 .map(|e| UserPrompt {
1293 id: Self::extract_prompt_id(&e.payload),
1294 text: e.payload.clone(),
1295 })
1296 }
1297
1298 fn extract_prompt_id(payload: &str) -> String {
1303 if let Some(start) = payload.find("id=\"")
1305 && let Some(end) = payload[start + 4..].find('"')
1306 {
1307 return payload[start + 4..start + 4 + end].to_string();
1308 }
1309
1310 format!("q{}", Self::generate_prompt_id())
1312 }
1313
1314 fn generate_prompt_id() -> String {
1317 use std::time::{SystemTime, UNIX_EPOCH};
1318 let nanos = SystemTime::now()
1319 .duration_since(UNIX_EPOCH)
1320 .unwrap()
1321 .as_nanos();
1322 format!("{:x}", nanos % 0xFFFF_FFFF)
1323 }
1324}
1325
1326#[derive(Debug, Clone)]
1330pub struct UserPrompt {
1331 pub id: String,
1333 pub text: String,
1335}
1336
1337#[derive(Debug, thiserror::Error)]
1339pub enum UserPromptError {
1340 #[error("Timeout waiting for user response")]
1341 Timeout,
1342
1343 #[error("Interrupted while waiting for user response")]
1344 Interrupted,
1345
1346 #[error("I/O error: {0}")]
1347 Io(#[from] std::io::Error),
1348}
1349
1350#[allow(dead_code)]
1366pub async fn wait_for_user_response_async(
1367 conversation_path: &std::path::Path,
1368 prompt_id: &str,
1369 timeout_secs: u64,
1370 mut interrupt_rx: Option<&mut tokio::sync::watch::Receiver<bool>>,
1371) -> Result<String, UserPromptError> {
1372 use tokio::time::{Duration, sleep, timeout};
1373
1374 let poll_interval = Duration::from_millis(100);
1375
1376 let result = timeout(Duration::from_secs(timeout_secs), async {
1377 loop {
1378 if let Some(rx) = &mut interrupt_rx
1380 && *rx.borrow()
1381 {
1382 return Err(UserPromptError::Interrupted);
1383 }
1384
1385 if let Some(response) = find_response_in_file(conversation_path, prompt_id)? {
1387 return Ok(response);
1388 }
1389
1390 sleep(poll_interval).await;
1392 }
1393 })
1394 .await;
1395
1396 match result {
1397 Ok(r) => r,
1398 Err(_) => Err(UserPromptError::Timeout),
1399 }
1400}
1401
1402#[allow(dead_code)]
1417pub fn wait_for_user_response(
1418 conversation_path: &std::path::Path,
1419 prompt_id: &str,
1420 timeout_secs: u64,
1421) -> Result<String, UserPromptError> {
1422 use std::thread;
1423 use std::time::{Duration, Instant};
1424
1425 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
1426 let poll_interval = Duration::from_millis(100);
1427
1428 loop {
1429 if Instant::now() >= deadline {
1431 return Err(UserPromptError::Timeout);
1432 }
1433
1434 if let Some(response) = find_response_in_file(conversation_path, prompt_id)? {
1436 return Ok(response);
1437 }
1438
1439 thread::sleep(poll_interval);
1441 }
1442}
1443
1444#[allow(dead_code)]
1446fn find_response_in_file(
1447 conversation_path: &std::path::Path,
1448 prompt_id: &str,
1449) -> Result<Option<String>, UserPromptError> {
1450 if !conversation_path.exists() {
1451 return Ok(None);
1452 }
1453
1454 let content = std::fs::read_to_string(conversation_path)?;
1455
1456 for line in content.lines() {
1457 if let Ok(entry) = serde_json::from_str::<crate::planning_session::ConversationEntry>(line)
1458 && entry.entry_type == crate::planning_session::ConversationType::UserResponse
1459 && entry.id == prompt_id
1460 {
1461 return Ok(Some(entry.text));
1462 }
1463 }
1464
1465 Ok(None)
1466}
1467
1468fn format_duration(d: Duration) -> String {
1470 let total_secs = d.as_secs();
1471 let hours = total_secs / 3600;
1472 let minutes = (total_secs % 3600) / 60;
1473 let seconds = total_secs % 60;
1474
1475 if hours > 0 {
1476 format!("{}h {}m {}s", hours, minutes, seconds)
1477 } else if minutes > 0 {
1478 format!("{}m {}s", minutes, seconds)
1479 } else {
1480 format!("{}s", seconds)
1481 }
1482}
1483
1484fn termination_status_text(reason: &TerminationReason) -> &'static str {
1486 match reason {
1487 TerminationReason::CompletionPromise => "All tasks completed successfully.",
1488 TerminationReason::MaxIterations => "Stopped at iteration limit.",
1489 TerminationReason::MaxRuntime => "Stopped at runtime limit.",
1490 TerminationReason::MaxCost => "Stopped at cost limit.",
1491 TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
1492 TerminationReason::LoopThrashing => {
1493 "Loop thrashing detected - same hat repeatedly blocked."
1494 }
1495 TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
1496 TerminationReason::Stopped => "Manually stopped.",
1497 TerminationReason::Interrupted => "Interrupted by signal.",
1498 TerminationReason::ChaosModeComplete => "Chaos mode exploration complete.",
1499 TerminationReason::ChaosModeMaxIterations => "Chaos mode stopped at iteration limit.",
1500 }
1501}