1mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::sync::atomic::AtomicBool;
24use std::time::Duration;
25use tracing::{debug, info, warn};
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum TerminationReason {
30 CompletionPromise,
32 MaxIterations,
34 MaxRuntime,
36 MaxCost,
38 ConsecutiveFailures,
40 LoopThrashing,
42 ValidationFailure,
44 Stopped,
46 Interrupted,
48 RestartRequested,
50}
51
52impl TerminationReason {
53 pub fn exit_code(&self) -> i32 {
61 match self {
62 TerminationReason::CompletionPromise => 0,
63 TerminationReason::ConsecutiveFailures
64 | TerminationReason::LoopThrashing
65 | TerminationReason::ValidationFailure
66 | TerminationReason::Stopped => 1,
67 TerminationReason::MaxIterations
68 | TerminationReason::MaxRuntime
69 | TerminationReason::MaxCost => 2,
70 TerminationReason::Interrupted => 130,
71 TerminationReason::RestartRequested => 3,
73 }
74 }
75
76 pub fn as_str(&self) -> &'static str {
81 match self {
82 TerminationReason::CompletionPromise => "completed",
83 TerminationReason::MaxIterations => "max_iterations",
84 TerminationReason::MaxRuntime => "max_runtime",
85 TerminationReason::MaxCost => "max_cost",
86 TerminationReason::ConsecutiveFailures => "consecutive_failures",
87 TerminationReason::LoopThrashing => "loop_thrashing",
88 TerminationReason::ValidationFailure => "validation_failure",
89 TerminationReason::Stopped => "stopped",
90 TerminationReason::Interrupted => "interrupted",
91 TerminationReason::RestartRequested => "restart_requested",
92 }
93 }
94
95 pub fn is_success(&self) -> bool {
97 matches!(self, TerminationReason::CompletionPromise)
98 }
99}
100
101pub struct EventLoop {
103 config: RalphConfig,
104 registry: HatRegistry,
105 bus: EventBus,
106 state: LoopState,
107 instruction_builder: InstructionBuilder,
108 ralph: HatlessRalph,
109 robot_guidance: Vec<String>,
111 pub(crate) event_reader: EventReader,
114 diagnostics: crate::diagnostics::DiagnosticsCollector,
115 loop_context: Option<LoopContext>,
117 skill_registry: SkillRegistry,
119 robot_service: Option<Box<dyn RobotService>>,
122}
123
124impl EventLoop {
125 pub fn new(config: RalphConfig) -> Self {
127 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
130 .unwrap_or_else(|e| {
131 debug!(
132 "Failed to initialize diagnostics: {}, using disabled collector",
133 e
134 );
135 crate::diagnostics::DiagnosticsCollector::disabled()
136 });
137
138 Self::with_diagnostics(config, diagnostics)
139 }
140
141 pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
147 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
148 .unwrap_or_else(|e| {
149 debug!(
150 "Failed to initialize diagnostics: {}, using disabled collector",
151 e
152 );
153 crate::diagnostics::DiagnosticsCollector::disabled()
154 });
155
156 Self::with_context_and_diagnostics(config, context, diagnostics)
157 }
158
159 pub fn with_context_and_diagnostics(
161 config: RalphConfig,
162 context: LoopContext,
163 diagnostics: crate::diagnostics::DiagnosticsCollector,
164 ) -> Self {
165 let registry = HatRegistry::from_config(&config);
166 let instruction_builder =
167 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
168
169 let mut bus = EventBus::new();
170
171 for hat in registry.all() {
175 bus.register(hat.clone());
176 }
177
178 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
182
183 if registry.is_empty() {
184 debug!("Solo mode: Ralph is the only coordinator");
185 } else {
186 debug!(
187 "Multi-hat mode: {} custom hats + Ralph as fallback",
188 registry.len()
189 );
190 }
191
192 let skill_registry = if config.skills.enabled {
194 SkillRegistry::from_config(
195 &config.skills,
196 context.workspace(),
197 Some(config.cli.backend.as_str()),
198 )
199 .unwrap_or_else(|e| {
200 warn!(
201 "Failed to build skill registry: {}, using empty registry",
202 e
203 );
204 SkillRegistry::new(Some(config.cli.backend.as_str()))
205 })
206 } else {
207 SkillRegistry::new(Some(config.cli.backend.as_str()))
208 };
209
210 let skill_index = if config.skills.enabled {
211 skill_registry.build_index(None)
212 } else {
213 String::new()
214 };
215
216 let ralph = HatlessRalph::new(
218 config.event_loop.completion_promise.clone(),
219 config.core.clone(),
220 ®istry,
221 config.event_loop.starting_event.clone(),
222 )
223 .with_memories_enabled(config.memories.enabled)
224 .with_skill_index(skill_index);
225
226 let events_path = std::fs::read_to_string(context.current_events_marker())
230 .map(|s| {
231 let relative = s.trim();
232 context.workspace().join(relative)
233 })
234 .unwrap_or_else(|_| context.events_path());
235 let event_reader = EventReader::new(&events_path);
236
237 Self {
238 config,
239 registry,
240 bus,
241 state: LoopState::new(),
242 instruction_builder,
243 ralph,
244 robot_guidance: Vec::new(),
245 event_reader,
246 diagnostics,
247 loop_context: Some(context),
248 skill_registry,
249 robot_service: None,
250 }
251 }
252
253 pub fn with_diagnostics(
255 config: RalphConfig,
256 diagnostics: crate::diagnostics::DiagnosticsCollector,
257 ) -> Self {
258 let registry = HatRegistry::from_config(&config);
259 let instruction_builder =
260 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
261
262 let mut bus = EventBus::new();
263
264 for hat in registry.all() {
268 bus.register(hat.clone());
269 }
270
271 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
275
276 if registry.is_empty() {
277 debug!("Solo mode: Ralph is the only coordinator");
278 } else {
279 debug!(
280 "Multi-hat mode: {} custom hats + Ralph as fallback",
281 registry.len()
282 );
283 }
284
285 let workspace_root = std::path::Path::new(".");
287 let skill_registry = if config.skills.enabled {
288 SkillRegistry::from_config(
289 &config.skills,
290 workspace_root,
291 Some(config.cli.backend.as_str()),
292 )
293 .unwrap_or_else(|e| {
294 warn!(
295 "Failed to build skill registry: {}, using empty registry",
296 e
297 );
298 SkillRegistry::new(Some(config.cli.backend.as_str()))
299 })
300 } else {
301 SkillRegistry::new(Some(config.cli.backend.as_str()))
302 };
303
304 let skill_index = if config.skills.enabled {
305 skill_registry.build_index(None)
306 } else {
307 String::new()
308 };
309
310 let ralph = HatlessRalph::new(
312 config.event_loop.completion_promise.clone(),
313 config.core.clone(),
314 ®istry,
315 config.event_loop.starting_event.clone(),
316 )
317 .with_memories_enabled(config.memories.enabled)
318 .with_skill_index(skill_index);
319
320 let events_path = std::fs::read_to_string(".ralph/current-events")
323 .map(|s| s.trim().to_string())
324 .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
325 let event_reader = EventReader::new(&events_path);
326
327 Self {
328 config,
329 registry,
330 bus,
331 state: LoopState::new(),
332 instruction_builder,
333 ralph,
334 robot_guidance: Vec::new(),
335 event_reader,
336 diagnostics,
337 loop_context: None,
338 skill_registry,
339 robot_service: None,
340 }
341 }
342
343 pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
351 self.robot_service = Some(service);
352 }
353
354 pub fn loop_context(&self) -> Option<&LoopContext> {
356 self.loop_context.as_ref()
357 }
358
359 fn tasks_path(&self) -> PathBuf {
361 self.loop_context
362 .as_ref()
363 .map(|ctx| ctx.tasks_path())
364 .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
365 }
366
367 fn scratchpad_path(&self) -> PathBuf {
369 self.loop_context
370 .as_ref()
371 .map(|ctx| ctx.scratchpad_path())
372 .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
373 }
374
375 pub fn state(&self) -> &LoopState {
377 &self.state
378 }
379
380 pub fn config(&self) -> &RalphConfig {
382 &self.config
383 }
384
385 pub fn registry(&self) -> &HatRegistry {
387 &self.registry
388 }
389
390 pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
395 self.registry
396 .get_config(hat_id)
397 .and_then(|config| config.backend.as_ref())
398 }
399
400 pub fn add_observer<F>(&mut self, observer: F)
405 where
406 F: Fn(&Event) + Send + 'static,
407 {
408 self.bus.add_observer(observer);
409 }
410
411 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
415 pub fn set_observer<F>(&mut self, observer: F)
416 where
417 F: Fn(&Event) + Send + 'static,
418 {
419 #[allow(deprecated)]
420 self.bus.set_observer(observer);
421 }
422
423 pub fn check_termination(&self) -> Option<TerminationReason> {
425 let cfg = &self.config.event_loop;
426
427 if self.state.iteration >= cfg.max_iterations {
428 return Some(TerminationReason::MaxIterations);
429 }
430
431 if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
432 return Some(TerminationReason::MaxRuntime);
433 }
434
435 if let Some(max_cost) = cfg.max_cost_usd
436 && self.state.cumulative_cost >= max_cost
437 {
438 return Some(TerminationReason::MaxCost);
439 }
440
441 if self.state.consecutive_failures >= cfg.max_consecutive_failures {
442 return Some(TerminationReason::ConsecutiveFailures);
443 }
444
445 if self.state.abandoned_task_redispatches >= 3 {
447 return Some(TerminationReason::LoopThrashing);
448 }
449
450 if self.state.consecutive_malformed_events >= 3 {
452 return Some(TerminationReason::ValidationFailure);
453 }
454
455 let stop_path =
457 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
458 if stop_path.exists() {
459 let _ = std::fs::remove_file(&stop_path);
460 return Some(TerminationReason::Stopped);
461 }
462
463 let restart_path =
465 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
466 if restart_path.exists() {
467 return Some(TerminationReason::RestartRequested);
468 }
469
470 None
471 }
472
473 pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
477 if !self.state.completion_requested {
478 return None;
479 }
480
481 self.state.completion_requested = false;
482
483 if self.config.event_loop.persistent {
485 info!("Completion event suppressed - persistent mode active, loop staying alive");
486
487 self.diagnostics.log_orchestration(
488 self.state.iteration,
489 "loop",
490 crate::diagnostics::OrchestrationEvent::LoopTerminated {
491 reason: "completion_event_suppressed_persistent".to_string(),
492 },
493 );
494
495 let resume_event = Event::new(
497 "task.resume",
498 "Persistent mode: loop staying alive after completion signal. \
499 Check for new tasks or await human guidance.",
500 );
501 self.bus.publish(resume_event);
502
503 return None;
504 }
505
506 if self.config.memories.enabled {
508 if let Ok(false) = self.verify_tasks_complete() {
509 let open_tasks = self.get_open_task_list();
510 warn!(
511 open_tasks = ?open_tasks,
512 "Completion event with {} open task(s) - trusting agent decision",
513 open_tasks.len()
514 );
515 }
516 } else if let Ok(false) = self.verify_scratchpad_complete() {
517 warn!("Completion event with pending scratchpad tasks - trusting agent decision");
518 }
519
520 info!("Completion event detected - terminating");
521
522 self.diagnostics.log_orchestration(
524 self.state.iteration,
525 "loop",
526 crate::diagnostics::OrchestrationEvent::LoopTerminated {
527 reason: "completion_event".to_string(),
528 },
529 );
530
531 Some(TerminationReason::CompletionPromise)
532 }
533
534 pub fn initialize(&mut self, prompt_content: &str) {
536 let topic = self
538 .config
539 .event_loop
540 .starting_event
541 .clone()
542 .unwrap_or_else(|| "task.start".to_string());
543 self.initialize_with_topic(&topic, prompt_content);
544 }
545
546 pub fn initialize_resume(&mut self, prompt_content: &str) {
551 self.initialize_with_topic("task.resume", prompt_content);
553 }
554
555 fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
557 self.ralph.set_objective(prompt_content.to_string());
561
562 let start_event = Event::new(topic, prompt_content);
563 self.bus.publish(start_event);
564 debug!(topic = topic, "Published {} event", topic);
565 }
566
567 pub fn next_hat(&self) -> Option<&HatId> {
576 let next = self.bus.next_hat_with_pending();
577
578 if next.is_none() && self.bus.has_human_pending() {
580 return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
581 }
582
583 next.as_ref()?;
585
586 if self.registry.is_empty() {
589 next
591 } else {
592 self.bus.hat_ids().find(|id| id.as_str() == "ralph")
595 }
596 }
597
598 pub fn has_pending_events(&self) -> bool {
603 self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
604 }
605
606 pub fn has_pending_human_events(&self) -> bool {
611 self.bus.has_human_pending()
612 }
613
614 pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
618 self.registry
619 .get(hat_id)
620 .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
621 .unwrap_or_default()
622 }
623
624 pub fn inject_fallback_event(&mut self) -> bool {
631 let fallback_event = Event::new(
632 "task.resume",
633 "RECOVERY: Previous iteration did not publish an event. \
634 Review the scratchpad and either dispatch the next task or complete the loop.",
635 );
636
637 let fallback_event = match &self.state.last_hat {
640 Some(hat_id) if hat_id.as_str() != "ralph" => {
641 debug!(
642 hat = %hat_id.as_str(),
643 "Injecting fallback event to recover - targeting last hat with task.resume"
644 );
645 fallback_event.with_target(hat_id.clone())
646 }
647 _ => {
648 debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
649 fallback_event
650 }
651 };
652
653 self.bus.publish(fallback_event);
654 true
655 }
656
657 pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
671 if hat_id.as_str() == "ralph" {
674 if self.registry.is_empty() {
675 let mut events = self.bus.take_pending(&hat_id.clone());
677 let mut human_events = self.bus.take_human_pending();
678 events.append(&mut human_events);
679
680 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
682 .into_iter()
683 .partition(|e| e.topic.as_str() == "human.guidance");
684
685 let events_context = regular_events
686 .iter()
687 .map(|e| Self::format_event(e))
688 .collect::<Vec<_>>()
689 .join("\n");
690
691 self.update_robot_guidance(guidance_events);
693 self.apply_robot_guidance();
694
695 let base_prompt = self.ralph.build_prompt(&events_context, &[]);
697 self.ralph.clear_robot_guidance();
698 let with_skills = self.prepend_auto_inject_skills(base_prompt);
699 let with_scratchpad = self.prepend_scratchpad(with_skills);
700 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
701
702 debug!("build_prompt: routing to HatlessRalph (solo mode)");
703 return Some(final_prompt);
704 } else {
705 let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
707 all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
709
710 let mut all_events = Vec::new();
711 let mut system_events = Vec::new();
712
713 for id in &all_hat_ids {
714 let pending = self.bus.take_pending(id);
715 if pending.is_empty() {
716 continue;
717 }
718
719 let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
720 if drop_pending {
721 if let Some(exhausted_event) = exhausted_event {
723 all_events.push(exhausted_event.clone());
724 system_events.push(exhausted_event);
725 }
726 continue;
727 }
728
729 all_events.extend(pending);
730 }
731
732 let mut human_events = self.bus.take_human_pending();
733 all_events.append(&mut human_events);
734
735 for event in system_events {
738 self.bus.publish(event);
739 }
740
741 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
743 .into_iter()
744 .partition(|e| e.topic.as_str() == "human.guidance");
745
746 self.update_robot_guidance(guidance_events);
749 self.apply_robot_guidance();
750
751 let active_hat_ids = self.determine_active_hat_ids(®ular_events);
753 self.record_hat_activations(&active_hat_ids);
754 let active_hats = self.determine_active_hats(®ular_events);
755
756 let events_context = regular_events
758 .iter()
759 .map(|e| Self::format_event(e))
760 .collect::<Vec<_>>()
761 .join("\n");
762
763 let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
765
766 debug!(
768 "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
769 active_hats
770 .iter()
771 .map(|h| h.id.as_str())
772 .collect::<Vec<_>>()
773 );
774
775 self.ralph.clear_robot_guidance();
777 let with_skills = self.prepend_auto_inject_skills(base_prompt);
778 let with_scratchpad = self.prepend_scratchpad(with_skills);
779 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
780
781 return Some(final_prompt);
782 }
783 }
784
785 let events = self.bus.take_pending(&hat_id.clone());
789 let events_context = events
790 .iter()
791 .map(|e| Self::format_event(e))
792 .collect::<Vec<_>>()
793 .join("\n");
794
795 let hat = self.registry.get(hat_id)?;
796
797 debug!(
799 "build_prompt: hat_id='{}', instructions.is_empty()={}",
800 hat_id.as_str(),
801 hat.instructions.is_empty()
802 );
803
804 debug!(
806 "build_prompt: routing to build_custom_hat() for '{}'",
807 hat_id.as_str()
808 );
809 Some(
810 self.instruction_builder
811 .build_custom_hat(hat, &events_context),
812 )
813 }
814
815 fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
821 if guidance_events.is_empty() {
822 return;
823 }
824
825 self.persist_guidance_to_scratchpad(&guidance_events);
827
828 self.robot_guidance
829 .extend(guidance_events.into_iter().map(|e| e.payload));
830 }
831
832 fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
837 use std::io::Write;
838
839 let scratchpad_path = self.scratchpad_path();
840 let resolved_path = if scratchpad_path.is_relative() {
841 self.config.core.workspace_root.join(&scratchpad_path)
842 } else {
843 scratchpad_path
844 };
845
846 if let Some(parent) = resolved_path.parent()
848 && !parent.exists()
849 && let Err(e) = std::fs::create_dir_all(parent)
850 {
851 warn!("Failed to create scratchpad directory: {}", e);
852 return;
853 }
854
855 let mut file = match std::fs::OpenOptions::new()
856 .create(true)
857 .append(true)
858 .open(&resolved_path)
859 {
860 Ok(f) => f,
861 Err(e) => {
862 warn!("Failed to open scratchpad for guidance persistence: {}", e);
863 return;
864 }
865 };
866
867 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
868 for event in guidance_events {
869 let entry = format!(
870 "\n### HUMAN GUIDANCE ({})\n\n{}\n",
871 timestamp, event.payload
872 );
873 if let Err(e) = file.write_all(entry.as_bytes()) {
874 warn!("Failed to write guidance to scratchpad: {}", e);
875 }
876 }
877
878 info!(
879 count = guidance_events.len(),
880 "Persisted human guidance to scratchpad"
881 );
882 }
883
884 fn apply_robot_guidance(&mut self) {
886 if self.robot_guidance.is_empty() {
887 return;
888 }
889
890 self.ralph.set_robot_guidance(self.robot_guidance.clone());
891 }
892
893 fn prepend_auto_inject_skills(&self, prompt: String) -> String {
903 let mut prefix = String::new();
904
905 self.inject_memories_and_tools_skill(&mut prefix);
907
908 self.inject_robot_skill(&mut prefix);
910
911 self.inject_custom_auto_skills(&mut prefix);
913
914 if prefix.is_empty() {
915 return prompt;
916 }
917
918 prefix.push_str("\n\n");
919 prefix.push_str(&prompt);
920 prefix
921 }
922
923 fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
931 let memories_config = &self.config.memories;
932
933 if memories_config.enabled && memories_config.inject == InjectMode::Auto {
935 info!(
936 "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
937 memories_config.enabled, memories_config.inject, self.config.core.workspace_root
938 );
939
940 let workspace_root = &self.config.core.workspace_root;
941 let store = MarkdownMemoryStore::with_default_path(workspace_root);
942 let memories_path = workspace_root.join(".ralph/agent/memories.md");
943
944 info!(
945 "Looking for memories at: {:?} (exists: {})",
946 memories_path,
947 memories_path.exists()
948 );
949
950 let memories = match store.load() {
951 Ok(memories) => {
952 info!("Successfully loaded {} memories from store", memories.len());
953 memories
954 }
955 Err(e) => {
956 info!(
957 "Failed to load memories for injection: {} (path: {:?})",
958 e, memories_path
959 );
960 Vec::new()
961 }
962 };
963
964 if memories.is_empty() {
965 info!("Memory store is empty - no memories to inject");
966 } else {
967 let mut memories_content = format_memories_as_markdown(&memories);
968
969 if memories_config.budget > 0 {
970 let original_len = memories_content.len();
971 memories_content =
972 truncate_to_budget(&memories_content, memories_config.budget);
973 debug!(
974 "Applied budget: {} chars -> {} chars (budget: {})",
975 original_len,
976 memories_content.len(),
977 memories_config.budget
978 );
979 }
980
981 info!(
982 "Injecting {} memories ({} chars) into prompt",
983 memories.len(),
984 memories_content.len()
985 );
986
987 prefix.push_str(&memories_content);
988 }
989 }
990
991 if memories_config.enabled || self.config.tasks.enabled {
993 if let Some(skill) = self.skill_registry.get("ralph-tools") {
994 if !prefix.is_empty() {
995 prefix.push_str("\n\n");
996 }
997 prefix.push_str(&format!(
998 "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
999 skill.content.trim()
1000 ));
1001 debug!("Injected ralph-tools skill from registry");
1002 } else {
1003 debug!("ralph-tools skill not found in registry - skill content not injected");
1004 }
1005 }
1006 }
1007
1008 fn inject_robot_skill(&self, prefix: &mut String) {
1013 if !self.config.robot.enabled {
1014 return;
1015 }
1016
1017 if let Some(skill) = self.skill_registry.get("robot-interaction") {
1018 if !prefix.is_empty() {
1019 prefix.push_str("\n\n");
1020 }
1021 prefix.push_str(&format!(
1022 "<robot-skill>\n{}\n</robot-skill>",
1023 skill.content.trim()
1024 ));
1025 debug!("Injected robot interaction skill from registry");
1026 }
1027 }
1028
1029 fn inject_custom_auto_skills(&self, prefix: &mut String) {
1031 for skill in self.skill_registry.auto_inject_skills(None) {
1032 if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1034 continue;
1035 }
1036
1037 if !prefix.is_empty() {
1038 prefix.push_str("\n\n");
1039 }
1040 prefix.push_str(&format!(
1041 "<{name}-skill>\n{content}\n</{name}-skill>",
1042 name = skill.name,
1043 content = skill.content.trim()
1044 ));
1045 debug!("Injected auto-inject skill: {}", skill.name);
1046 }
1047 }
1048
1049 fn prepend_scratchpad(&self, prompt: String) -> String {
1055 let scratchpad_path = self.scratchpad_path();
1056
1057 let resolved_path = if scratchpad_path.is_relative() {
1058 self.config.core.workspace_root.join(&scratchpad_path)
1059 } else {
1060 scratchpad_path
1061 };
1062
1063 if !resolved_path.exists() {
1064 debug!(
1065 "Scratchpad not found at {:?}, skipping injection",
1066 resolved_path
1067 );
1068 return prompt;
1069 }
1070
1071 let content = match std::fs::read_to_string(&resolved_path) {
1072 Ok(c) => c,
1073 Err(e) => {
1074 info!("Failed to read scratchpad for injection: {}", e);
1075 return prompt;
1076 }
1077 };
1078
1079 if content.trim().is_empty() {
1080 debug!("Scratchpad is empty, skipping injection");
1081 return prompt;
1082 }
1083
1084 let char_budget = 4000 * 4;
1086 let content = if content.len() > char_budget {
1087 let mut start = content.len() - char_budget;
1089 while start < content.len() && !content.is_char_boundary(start) {
1090 start += 1;
1091 }
1092 let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1093 let discarded = &content[..line_start];
1094
1095 let headings: Vec<&str> = discarded
1097 .lines()
1098 .filter(|line| line.starts_with('#'))
1099 .collect();
1100 let summary = if headings.is_empty() {
1101 format!(
1102 "<!-- earlier content truncated ({} chars omitted) -->",
1103 line_start
1104 )
1105 } else {
1106 format!(
1107 "<!-- earlier content truncated ({} chars omitted) -->\n\
1108 <!-- discarded sections: {} -->",
1109 line_start,
1110 headings.join(" | ")
1111 )
1112 };
1113
1114 format!("{}\n\n{}", summary, &content[line_start..])
1115 } else {
1116 content
1117 };
1118
1119 info!("Injecting scratchpad ({} chars) into prompt", content.len());
1120
1121 let mut final_prompt = format!(
1122 "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1123 self.config.core.scratchpad, content
1124 );
1125 final_prompt.push_str(&prompt);
1126 final_prompt
1127 }
1128
1129 fn prepend_ready_tasks(&self, prompt: String) -> String {
1135 if !self.config.tasks.enabled {
1136 return prompt;
1137 }
1138
1139 use crate::task::TaskStatus;
1140 use crate::task_store::TaskStore;
1141
1142 let tasks_path = self.tasks_path();
1143 let resolved_path = if tasks_path.is_relative() {
1144 self.config.core.workspace_root.join(&tasks_path)
1145 } else {
1146 tasks_path
1147 };
1148
1149 if !resolved_path.exists() {
1150 return prompt;
1151 }
1152
1153 let store = match TaskStore::load(&resolved_path) {
1154 Ok(s) => s,
1155 Err(e) => {
1156 info!("Failed to load task store for injection: {}", e);
1157 return prompt;
1158 }
1159 };
1160
1161 let ready = store.ready();
1162 let open = store.open();
1163 let closed_count = store.all().len() - open.len();
1164
1165 if open.is_empty() && closed_count == 0 {
1166 return prompt;
1167 }
1168
1169 let mut section = String::from("<ready-tasks>\n");
1170 if ready.is_empty() && open.is_empty() {
1171 section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1172 } else {
1173 section.push_str(&format!(
1174 "## Tasks: {} ready, {} open, {} closed\n\n",
1175 ready.len(),
1176 open.len(),
1177 closed_count
1178 ));
1179 for task in &ready {
1180 let status_icon = match task.status {
1181 TaskStatus::Open => "[ ]",
1182 TaskStatus::InProgress => "[~]",
1183 _ => "[?]",
1184 };
1185 section.push_str(&format!(
1186 "- {} [P{}] {} ({})\n",
1187 status_icon, task.priority, task.title, task.id
1188 ));
1189 }
1190 let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1192 let blocked: Vec<_> = open
1193 .iter()
1194 .filter(|t| !ready_ids.contains(&t.id.as_str()))
1195 .collect();
1196 if !blocked.is_empty() {
1197 section.push_str("\nBlocked:\n");
1198 for task in blocked {
1199 section.push_str(&format!(
1200 "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1201 task.priority,
1202 task.title,
1203 task.id,
1204 task.blocked_by.join(", ")
1205 ));
1206 }
1207 }
1208 }
1209 section.push_str("</ready-tasks>\n\n");
1210
1211 info!(
1212 "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1213 ready.len(),
1214 open.len(),
1215 closed_count
1216 );
1217
1218 let mut final_prompt = section;
1219 final_prompt.push_str(&prompt);
1220 final_prompt
1221 }
1222
1223 pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1225 self.ralph.build_prompt(prompt_content, &[])
1226 }
1227
1228 fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1231 let mut active_hats = Vec::new();
1232 for id in self.determine_active_hat_ids(events) {
1233 if let Some(hat) = self.registry.get(&id) {
1234 active_hats.push(hat);
1235 }
1236 }
1237 active_hats
1238 }
1239
1240 fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1241 let mut active_hat_ids = Vec::new();
1242 for event in events {
1243 if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1244 if !active_hat_ids.iter().any(|id| id == &hat.id) {
1246 active_hat_ids.push(hat.id.clone());
1247 }
1248 }
1249 }
1250 active_hat_ids
1251 }
1252
1253 fn format_event(event: &Event) -> String {
1258 let topic = &event.topic;
1259 let payload = &event.payload;
1260
1261 if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1262 format!(
1263 "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1264 topic, payload
1265 )
1266 } else {
1267 format!("Event: {} - {}", topic, payload)
1268 }
1269 }
1270
1271 fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1272 let Some(config) = self.registry.get_config(hat_id) else {
1273 return (false, None);
1274 };
1275 let Some(max) = config.max_activations else {
1276 return (false, None);
1277 };
1278
1279 let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1280 if count < max {
1281 return (false, None);
1282 }
1283
1284 let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1286
1287 if !should_emit {
1288 return (true, None);
1290 }
1291
1292 let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1293 dropped_topics.sort();
1294
1295 let payload = format!(
1296 "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n - {topics}",
1297 hat = hat_id.as_str(),
1298 max = max,
1299 count = count,
1300 topics = dropped_topics.join("\n - ")
1301 );
1302
1303 warn!(
1304 hat = %hat_id.as_str(),
1305 max_activations = max,
1306 activations = count,
1307 "Hat exhausted (max_activations reached)"
1308 );
1309
1310 (
1311 true,
1312 Some(Event::new(
1313 format!("{}.exhausted", hat_id.as_str()),
1314 payload,
1315 )),
1316 )
1317 }
1318
1319 fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1320 for hat_id in active_hat_ids {
1321 *self
1322 .state
1323 .hat_activation_counts
1324 .entry(hat_id.clone())
1325 .or_insert(0) += 1;
1326 }
1327 }
1328
1329 pub fn get_active_hat_id(&self) -> HatId {
1332 for hat_id in self.bus.hat_ids() {
1334 let Some(events) = self.bus.peek_pending(hat_id) else {
1335 continue;
1336 };
1337 let Some(event) = events.first() else {
1338 continue;
1339 };
1340 if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1341 return active_hat.id.clone();
1342 }
1343 }
1344 HatId::new("ralph")
1345 }
1346
1347 pub fn record_event_count(&mut self) -> usize {
1352 self.event_reader
1353 .read_new_events()
1354 .map(|r| r.events.len())
1355 .unwrap_or(0)
1356 }
1357
1358 pub fn check_default_publishes(&mut self, hat_id: &HatId, _events_before: usize) {
1364 let events_after = self
1365 .event_reader
1366 .read_new_events()
1367 .map(|r| r.events.len())
1368 .unwrap_or(0);
1369
1370 if events_after == 0
1371 && let Some(config) = self.registry.get_config(hat_id)
1372 && let Some(default_topic) = &config.default_publishes
1373 {
1374 let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1376
1377 debug!(
1378 hat = %hat_id.as_str(),
1379 topic = %default_topic,
1380 "No events written by hat, injecting default_publishes event"
1381 );
1382
1383 self.bus.publish(default_event);
1384 }
1385 }
1386
1387 pub fn bus(&mut self) -> &mut EventBus {
1392 &mut self.bus
1393 }
1394
1395 pub fn process_output(
1399 &mut self,
1400 hat_id: &HatId,
1401 output: &str,
1402 success: bool,
1403 ) -> Option<TerminationReason> {
1404 self.state.iteration += 1;
1405 self.state.last_hat = Some(hat_id.clone());
1406
1407 if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1409 && let Some(ref robot_service) = self.robot_service
1410 {
1411 let elapsed = self.state.elapsed();
1412 let interval = std::time::Duration::from_secs(interval_secs);
1413 let last = self
1414 .state
1415 .last_checkin_at
1416 .map(|t| t.elapsed())
1417 .unwrap_or(elapsed);
1418
1419 if last >= interval {
1420 let context = self.build_checkin_context(hat_id);
1421 match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1422 Ok(_) => {
1423 self.state.last_checkin_at = Some(std::time::Instant::now());
1424 debug!(iteration = self.state.iteration, "Sent robot check-in");
1425 }
1426 Err(e) => {
1427 warn!(error = %e, "Failed to send robot check-in");
1428 }
1429 }
1430 }
1431 }
1432
1433 self.diagnostics.log_orchestration(
1435 self.state.iteration,
1436 "loop",
1437 crate::diagnostics::OrchestrationEvent::IterationStarted,
1438 );
1439
1440 self.diagnostics.log_orchestration(
1442 self.state.iteration,
1443 "loop",
1444 crate::diagnostics::OrchestrationEvent::HatSelected {
1445 hat: hat_id.to_string(),
1446 reason: "process_output".to_string(),
1447 },
1448 );
1449
1450 if success {
1452 self.state.consecutive_failures = 0;
1453 } else {
1454 self.state.consecutive_failures += 1;
1455 }
1456
1457 let _ = output;
1458
1459 self.check_termination()
1465 }
1466
1467 fn extract_task_id(payload: &str) -> String {
1470 payload
1471 .lines()
1472 .next()
1473 .unwrap_or("unknown")
1474 .trim()
1475 .to_string()
1476 }
1477
1478 pub fn add_cost(&mut self, cost: f64) {
1480 self.state.cumulative_cost += cost;
1481 }
1482
1483 fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1490 let scratchpad_path = self.scratchpad_path();
1491
1492 if !scratchpad_path.exists() {
1493 return Err(std::io::Error::new(
1494 std::io::ErrorKind::NotFound,
1495 "Scratchpad does not exist",
1496 ));
1497 }
1498
1499 let content = std::fs::read_to_string(scratchpad_path)?;
1500
1501 let has_pending = content
1502 .lines()
1503 .any(|line| line.trim_start().starts_with("- [ ]"));
1504
1505 Ok(!has_pending)
1506 }
1507
1508 fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1509 use crate::task_store::TaskStore;
1510
1511 let tasks_path = self.tasks_path();
1512
1513 if !tasks_path.exists() {
1515 return Ok(true);
1516 }
1517
1518 let store = TaskStore::load(&tasks_path)?;
1519 Ok(!store.has_pending_tasks())
1520 }
1521
1522 fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
1524 let (open_tasks, closed_tasks) = self.count_tasks();
1525 CheckinContext {
1526 current_hat: Some(hat_id.as_str().to_string()),
1527 open_tasks,
1528 closed_tasks,
1529 cumulative_cost: self.state.cumulative_cost,
1530 }
1531 }
1532
1533 fn count_tasks(&self) -> (usize, usize) {
1538 use crate::task::TaskStatus;
1539 use crate::task_store::TaskStore;
1540
1541 let tasks_path = self.tasks_path();
1542 if !tasks_path.exists() {
1543 return (0, 0);
1544 }
1545
1546 match TaskStore::load(&tasks_path) {
1547 Ok(store) => {
1548 let total = store.all().len();
1549 let open = store.open().len();
1550 let closed = total - open;
1551 debug_assert_eq!(
1553 closed,
1554 store
1555 .all()
1556 .iter()
1557 .filter(|t| t.status == TaskStatus::Closed)
1558 .count()
1559 );
1560 (open, closed)
1561 }
1562 Err(_) => (0, 0),
1563 }
1564 }
1565
1566 fn get_open_task_list(&self) -> Vec<String> {
1568 use crate::task_store::TaskStore;
1569
1570 let tasks_path = self.tasks_path();
1571 if let Ok(store) = TaskStore::load(&tasks_path) {
1572 return store
1573 .open()
1574 .iter()
1575 .map(|t| format!("{}: {}", t.id, t.title))
1576 .collect();
1577 }
1578 vec![]
1579 }
1580
1581 fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
1582 let threshold = self.config.event_loop.mutation_score_warn_threshold;
1583
1584 match &evidence.mutants {
1585 Some(mutants) => {
1586 if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
1587 warn!(
1588 reason = %reason,
1589 mutants_status = ?mutants.status,
1590 mutants_score = mutants.score_percent,
1591 mutants_threshold = threshold,
1592 "Mutation testing warning"
1593 );
1594 }
1595 }
1596 None => {
1597 if let Some(threshold) = threshold {
1598 warn!(
1599 mutants_threshold = threshold,
1600 "Mutation testing warning: missing mutation evidence in build.done payload"
1601 );
1602 }
1603 }
1604 }
1605 }
1606
1607 fn mutation_warning_reason(
1608 mutants: &MutationEvidence,
1609 threshold: Option<f64>,
1610 ) -> Option<String> {
1611 match mutants.status {
1612 MutationStatus::Fail => Some("mutation testing failed".to_string()),
1613 MutationStatus::Warn => Some(Self::format_mutation_message(
1614 "mutation score below threshold",
1615 mutants.score_percent,
1616 )),
1617 MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
1618 MutationStatus::Pass => {
1619 let threshold = threshold?;
1620
1621 match mutants.score_percent {
1622 Some(score) if score < threshold => Some(format!(
1623 "mutation score {:.2}% below threshold {:.2}%",
1624 score, threshold
1625 )),
1626 Some(_) => None,
1627 None => Some(format!(
1628 "mutation score missing (threshold {:.2}%)",
1629 threshold
1630 )),
1631 }
1632 }
1633 }
1634 }
1635
1636 fn format_mutation_message(message: &str, score: Option<f64>) -> String {
1637 match score {
1638 Some(score) => format!("{message} ({score:.2}%)"),
1639 None => message.to_string(),
1640 }
1641 }
1642
1643 pub fn process_events_from_jsonl(&mut self) -> std::io::Result<bool> {
1652 let result = self.event_reader.read_new_events()?;
1653
1654 for malformed in &result.malformed {
1656 let payload = format!(
1657 "Line {}: {}\nContent: {}",
1658 malformed.line_number, malformed.error, &malformed.content
1659 );
1660 let event = Event::new("event.malformed", &payload);
1661 self.bus.publish(event);
1662 self.state.consecutive_malformed_events += 1;
1663 warn!(
1664 line = malformed.line_number,
1665 consecutive = self.state.consecutive_malformed_events,
1666 "Malformed event line detected"
1667 );
1668 }
1669
1670 if !result.events.is_empty() {
1672 self.state.consecutive_malformed_events = 0;
1673 }
1674
1675 if result.events.is_empty() && result.malformed.is_empty() {
1676 return Ok(false);
1677 }
1678
1679 let mut has_orphans = false;
1680
1681 let mut validated_events = Vec::new();
1683 let completion_topic = self.config.event_loop.completion_promise.as_str();
1684 let total_events = result.events.len();
1685 for (index, event) in result.events.into_iter().enumerate() {
1686 let payload = event.payload.clone().unwrap_or_default();
1687
1688 if event.topic == completion_topic {
1689 if index + 1 == total_events {
1690 self.state.completion_requested = true;
1691 self.diagnostics.log_orchestration(
1692 self.state.iteration,
1693 "jsonl",
1694 crate::diagnostics::OrchestrationEvent::EventPublished {
1695 topic: event.topic.clone(),
1696 },
1697 );
1698 info!(
1699 topic = %event.topic,
1700 "Completion event detected in JSONL"
1701 );
1702 } else {
1703 warn!(
1704 topic = %event.topic,
1705 index = index,
1706 total_events = total_events,
1707 "Completion event ignored because it was not the last event"
1708 );
1709 }
1710 continue;
1711 }
1712
1713 if event.topic == "build.done" {
1714 if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1716 if evidence.all_passed() {
1717 self.warn_on_mutation_evidence(&evidence);
1718 validated_events.push(Event::new(event.topic.as_str(), &payload));
1719 } else {
1720 warn!(
1722 tests = evidence.tests_passed,
1723 lint = evidence.lint_passed,
1724 typecheck = evidence.typecheck_passed,
1725 audit = evidence.audit_passed,
1726 coverage = evidence.coverage_passed,
1727 complexity = evidence.complexity_score,
1728 duplication = evidence.duplication_passed,
1729 performance = evidence.performance_regression,
1730 specs = evidence.specs_verified,
1731 "build.done rejected: backpressure checks failed"
1732 );
1733
1734 let complexity = evidence
1735 .complexity_score
1736 .map(|value| format!("{value:.2}"))
1737 .unwrap_or_else(|| "missing".to_string());
1738 let performance = match evidence.performance_regression {
1739 Some(true) => "regression".to_string(),
1740 Some(false) => "pass".to_string(),
1741 None => "missing".to_string(),
1742 };
1743 let specs = match evidence.specs_verified {
1744 Some(true) => "pass".to_string(),
1745 Some(false) => "fail".to_string(),
1746 None => "not reported".to_string(),
1747 };
1748
1749 self.diagnostics.log_orchestration(
1750 self.state.iteration,
1751 "jsonl",
1752 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1753 reason: format!(
1754 "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
1755 evidence.tests_passed,
1756 evidence.lint_passed,
1757 evidence.typecheck_passed,
1758 evidence.audit_passed,
1759 evidence.coverage_passed,
1760 complexity,
1761 evidence.duplication_passed,
1762 performance,
1763 specs
1764 ),
1765 },
1766 );
1767
1768 validated_events.push(Event::new(
1769 "build.blocked",
1770 "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
1771 ));
1772 }
1773 } else {
1774 warn!("build.done rejected: missing backpressure evidence");
1776
1777 self.diagnostics.log_orchestration(
1778 self.state.iteration,
1779 "jsonl",
1780 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1781 reason: "missing backpressure evidence".to_string(),
1782 },
1783 );
1784
1785 validated_events.push(Event::new(
1786 "build.blocked",
1787 "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
1788 ));
1789 }
1790 } else if event.topic == "review.done" {
1791 if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
1793 if evidence.is_verified() {
1794 validated_events.push(Event::new(event.topic.as_str(), &payload));
1795 } else {
1796 warn!(
1798 tests = evidence.tests_passed,
1799 build = evidence.build_passed,
1800 "review.done rejected: verification checks failed"
1801 );
1802
1803 self.diagnostics.log_orchestration(
1804 self.state.iteration,
1805 "jsonl",
1806 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1807 reason: format!(
1808 "review verification failed: tests={}, build={}",
1809 evidence.tests_passed, evidence.build_passed
1810 ),
1811 },
1812 );
1813
1814 validated_events.push(Event::new(
1815 "review.blocked",
1816 "Review verification failed. Run tests and build before emitting review.done.",
1817 ));
1818 }
1819 } else {
1820 warn!("review.done rejected: missing verification evidence");
1822
1823 self.diagnostics.log_orchestration(
1824 self.state.iteration,
1825 "jsonl",
1826 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1827 reason: "missing review verification evidence".to_string(),
1828 },
1829 );
1830
1831 validated_events.push(Event::new(
1832 "review.blocked",
1833 "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
1834 ));
1835 }
1836 } else if event.topic == "verify.passed" {
1837 if let Some(report) = EventParser::parse_quality_report(&payload) {
1838 if report.meets_thresholds() {
1839 validated_events.push(Event::new(event.topic.as_str(), &payload));
1840 } else {
1841 let failed = report.failed_dimensions();
1842 let reason = if failed.is_empty() {
1843 "quality thresholds failed".to_string()
1844 } else {
1845 format!("quality thresholds failed: {}", failed.join(", "))
1846 };
1847
1848 warn!(
1849 failed_dimensions = ?failed,
1850 "verify.passed rejected: quality thresholds failed"
1851 );
1852
1853 self.diagnostics.log_orchestration(
1854 self.state.iteration,
1855 "jsonl",
1856 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1857 reason,
1858 },
1859 );
1860
1861 validated_events.push(Event::new(
1862 "verify.failed",
1863 "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
1864 ));
1865 }
1866 } else {
1867 warn!("verify.passed rejected: missing quality report");
1869
1870 self.diagnostics.log_orchestration(
1871 self.state.iteration,
1872 "jsonl",
1873 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1874 reason: "missing quality report".to_string(),
1875 },
1876 );
1877
1878 validated_events.push(Event::new(
1879 "verify.failed",
1880 "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
1881 ));
1882 }
1883 } else if event.topic == "verify.failed" {
1884 if EventParser::parse_quality_report(&payload).is_none() {
1885 warn!("verify.failed missing quality report");
1886 }
1887 validated_events.push(Event::new(event.topic.as_str(), &payload));
1888 } else {
1889 validated_events.push(Event::new(event.topic.as_str(), &payload));
1891 }
1892 }
1893
1894 let blocked_events: Vec<_> = validated_events
1896 .iter()
1897 .filter(|e| e.topic == "build.blocked".into())
1898 .collect();
1899
1900 for blocked_event in &blocked_events {
1901 let task_id = Self::extract_task_id(&blocked_event.payload);
1902
1903 let count = self
1904 .state
1905 .task_block_counts
1906 .entry(task_id.clone())
1907 .or_insert(0);
1908 *count += 1;
1909
1910 debug!(
1911 task_id = %task_id,
1912 block_count = *count,
1913 "Task blocked"
1914 );
1915
1916 if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
1918 warn!(
1919 task_id = %task_id,
1920 "Task abandoned after 3 consecutive blocks"
1921 );
1922
1923 self.state.abandoned_tasks.push(task_id.clone());
1924
1925 self.diagnostics.log_orchestration(
1926 self.state.iteration,
1927 "jsonl",
1928 crate::diagnostics::OrchestrationEvent::TaskAbandoned {
1929 reason: format!(
1930 "3 consecutive build.blocked events for task '{}'",
1931 task_id
1932 ),
1933 },
1934 );
1935
1936 let abandoned_event = Event::new(
1937 "build.task.abandoned",
1938 format!(
1939 "Task '{}' abandoned after 3 consecutive build.blocked events",
1940 task_id
1941 ),
1942 );
1943
1944 self.bus.publish(abandoned_event);
1945 }
1946 }
1947
1948 let has_blocked_event = !blocked_events.is_empty();
1950
1951 if has_blocked_event {
1952 self.state.consecutive_blocked += 1;
1953 } else {
1954 self.state.consecutive_blocked = 0;
1955 self.state.last_blocked_hat = None;
1956 }
1957
1958 let mut response_event = None;
1962 let ask_human_idx = validated_events
1963 .iter()
1964 .position(|e| e.topic == "human.interact".into());
1965
1966 if let Some(idx) = ask_human_idx {
1967 let ask_event = &validated_events[idx];
1968 let payload = ask_event.payload.clone();
1969
1970 if let Some(ref robot_service) = self.robot_service {
1971 info!(
1972 payload = %payload,
1973 "human.interact event detected — sending question via robot service"
1974 );
1975
1976 let send_ok = match robot_service.send_question(&payload) {
1978 Ok(_message_id) => true,
1979 Err(e) => {
1980 warn!(
1981 error = %e,
1982 "Failed to send human.interact question after retries — treating as timeout"
1983 );
1984 self.diagnostics.log_error(
1986 self.state.iteration,
1987 "telegram",
1988 crate::diagnostics::DiagnosticError::TelegramSendError {
1989 operation: "send_question".to_string(),
1990 error: e.to_string(),
1991 retry_count: 3,
1992 },
1993 );
1994 false
1995 }
1996 };
1997
1998 if send_ok {
2001 let events_path = self
2004 .loop_context
2005 .as_ref()
2006 .and_then(|ctx| {
2007 std::fs::read_to_string(ctx.current_events_marker())
2008 .ok()
2009 .map(|s| ctx.workspace().join(s.trim()))
2010 })
2011 .or_else(|| {
2012 std::fs::read_to_string(".ralph/current-events")
2013 .ok()
2014 .map(|s| PathBuf::from(s.trim()))
2015 })
2016 .unwrap_or_else(|| {
2017 self.loop_context
2018 .as_ref()
2019 .map(|ctx| ctx.events_path())
2020 .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2021 });
2022
2023 match robot_service.wait_for_response(&events_path) {
2024 Ok(Some(response)) => {
2025 info!(
2026 response = %response,
2027 "Received human.response — continuing loop"
2028 );
2029 response_event = Some(Event::new("human.response", &response));
2031 }
2032 Ok(None) => {
2033 warn!(
2034 timeout_secs = robot_service.timeout_secs(),
2035 "Human response timeout — continuing without response"
2036 );
2037 }
2038 Err(e) => {
2039 warn!(
2040 error = %e,
2041 "Error waiting for human response — continuing without response"
2042 );
2043 }
2044 }
2045 }
2046 } else {
2047 debug!(
2048 "human.interact event detected but no robot service active — passing through"
2049 );
2050 }
2051 }
2052
2053 for event in validated_events {
2058 self.diagnostics.log_orchestration(
2059 self.state.iteration,
2060 "jsonl",
2061 crate::diagnostics::OrchestrationEvent::EventPublished {
2062 topic: event.topic.to_string(),
2063 },
2064 );
2065
2066 if !self.registry.has_subscriber(event.topic.as_str()) {
2067 has_orphans = true;
2068 }
2069
2070 debug!(
2071 topic = %event.topic,
2072 "Publishing event from JSONL"
2073 );
2074 self.bus.publish(event);
2075 }
2076
2077 if let Some(response) = response_event {
2079 info!(
2080 topic = %response.topic,
2081 "Publishing human.response event from robot service"
2082 );
2083 self.bus.publish(response);
2084 }
2085
2086 Ok(has_orphans)
2087 }
2088
2089 pub fn check_ralph_completion(&self, output: &str) -> bool {
2093 let events = EventParser::new().parse(output);
2094 events
2095 .iter()
2096 .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2097 }
2098
2099 pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2106 self.stop_robot_service();
2108
2109 let elapsed = self.state.elapsed();
2110 let duration_str = format_duration(elapsed);
2111
2112 let payload = format!(
2113 "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2114 reason.as_str(),
2115 termination_status_text(reason),
2116 self.state.iteration,
2117 duration_str,
2118 reason.exit_code()
2119 );
2120
2121 let event = Event::new("loop.terminate", &payload);
2122
2123 self.bus.publish(event.clone());
2125
2126 info!(
2127 reason = %reason.as_str(),
2128 iterations = self.state.iteration,
2129 duration = %duration_str,
2130 "Wrapping up: {}. {} iterations in {}.",
2131 reason.as_str(),
2132 self.state.iteration,
2133 duration_str
2134 );
2135
2136 event
2137 }
2138
2139 pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2144 self.robot_service.as_ref().map(|s| s.shutdown_flag())
2145 }
2146
2147 fn stop_robot_service(&mut self) {
2151 if let Some(service) = self.robot_service.take() {
2152 service.stop();
2153 }
2154 }
2155
2156 pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2164 events
2165 .iter()
2166 .find(|e| e.topic.as_str() == "user.prompt")
2167 .map(|e| UserPrompt {
2168 id: Self::extract_prompt_id(&e.payload),
2169 text: e.payload.clone(),
2170 })
2171 }
2172
2173 fn extract_prompt_id(payload: &str) -> String {
2178 if let Some(start) = payload.find("id=\"")
2180 && let Some(end) = payload[start + 4..].find('"')
2181 {
2182 return payload[start + 4..start + 4 + end].to_string();
2183 }
2184
2185 format!("q{}", Self::generate_prompt_id())
2187 }
2188
2189 fn generate_prompt_id() -> String {
2192 use std::time::{SystemTime, UNIX_EPOCH};
2193 let nanos = SystemTime::now()
2194 .duration_since(UNIX_EPOCH)
2195 .unwrap()
2196 .as_nanos();
2197 format!("{:x}", nanos % 0xFFFF_FFFF)
2198 }
2199}
2200
2201#[derive(Debug, Clone)]
2205pub struct UserPrompt {
2206 pub id: String,
2208 pub text: String,
2210}
2211
2212fn format_duration(d: Duration) -> String {
2214 let total_secs = d.as_secs();
2215 let hours = total_secs / 3600;
2216 let minutes = (total_secs % 3600) / 60;
2217 let seconds = total_secs % 60;
2218
2219 if hours > 0 {
2220 format!("{}h {}m {}s", hours, minutes, seconds)
2221 } else if minutes > 0 {
2222 format!("{}m {}s", minutes, seconds)
2223 } else {
2224 format!("{}s", seconds)
2225 }
2226}
2227
2228fn termination_status_text(reason: &TerminationReason) -> &'static str {
2230 match reason {
2231 TerminationReason::CompletionPromise => "All tasks completed successfully.",
2232 TerminationReason::MaxIterations => "Stopped at iteration limit.",
2233 TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2234 TerminationReason::MaxCost => "Stopped at cost limit.",
2235 TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2236 TerminationReason::LoopThrashing => {
2237 "Loop thrashing detected - same hat repeatedly blocked."
2238 }
2239 TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2240 TerminationReason::Stopped => "Manually stopped.",
2241 TerminationReason::Interrupted => "Interrupted by signal.",
2242 TerminationReason::RestartRequested => "Restarting by human request.",
2243 }
2244}