1mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use crate::text::floor_char_boundary;
21use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::sync::atomic::AtomicBool;
25use std::time::Duration;
26use tracing::{debug, info, warn};
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum TerminationReason {
31 CompletionPromise,
33 MaxIterations,
35 MaxRuntime,
37 MaxCost,
39 ConsecutiveFailures,
41 LoopThrashing,
43 ValidationFailure,
45 Stopped,
47 Interrupted,
49 RestartRequested,
51}
52
53impl TerminationReason {
54 pub fn exit_code(&self) -> i32 {
62 match self {
63 TerminationReason::CompletionPromise => 0,
64 TerminationReason::ConsecutiveFailures
65 | TerminationReason::LoopThrashing
66 | TerminationReason::ValidationFailure
67 | TerminationReason::Stopped => 1,
68 TerminationReason::MaxIterations
69 | TerminationReason::MaxRuntime
70 | TerminationReason::MaxCost => 2,
71 TerminationReason::Interrupted => 130,
72 TerminationReason::RestartRequested => 3,
74 }
75 }
76
77 pub fn as_str(&self) -> &'static str {
82 match self {
83 TerminationReason::CompletionPromise => "completed",
84 TerminationReason::MaxIterations => "max_iterations",
85 TerminationReason::MaxRuntime => "max_runtime",
86 TerminationReason::MaxCost => "max_cost",
87 TerminationReason::ConsecutiveFailures => "consecutive_failures",
88 TerminationReason::LoopThrashing => "loop_thrashing",
89 TerminationReason::ValidationFailure => "validation_failure",
90 TerminationReason::Stopped => "stopped",
91 TerminationReason::Interrupted => "interrupted",
92 TerminationReason::RestartRequested => "restart_requested",
93 }
94 }
95
96 pub fn is_success(&self) -> bool {
98 matches!(self, TerminationReason::CompletionPromise)
99 }
100}
101
102pub struct EventLoop {
104 config: RalphConfig,
105 registry: HatRegistry,
106 bus: EventBus,
107 state: LoopState,
108 instruction_builder: InstructionBuilder,
109 ralph: HatlessRalph,
110 robot_guidance: Vec<String>,
112 pub(crate) event_reader: EventReader,
115 diagnostics: crate::diagnostics::DiagnosticsCollector,
116 loop_context: Option<LoopContext>,
118 skill_registry: SkillRegistry,
120 robot_service: Option<Box<dyn RobotService>>,
123}
124
125impl EventLoop {
126 pub fn new(config: RalphConfig) -> Self {
128 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
131 .unwrap_or_else(|e| {
132 debug!(
133 "Failed to initialize diagnostics: {}, using disabled collector",
134 e
135 );
136 crate::diagnostics::DiagnosticsCollector::disabled()
137 });
138
139 Self::with_diagnostics(config, diagnostics)
140 }
141
142 pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
148 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
149 .unwrap_or_else(|e| {
150 debug!(
151 "Failed to initialize diagnostics: {}, using disabled collector",
152 e
153 );
154 crate::diagnostics::DiagnosticsCollector::disabled()
155 });
156
157 Self::with_context_and_diagnostics(config, context, diagnostics)
158 }
159
160 pub fn with_context_and_diagnostics(
162 config: RalphConfig,
163 context: LoopContext,
164 diagnostics: crate::diagnostics::DiagnosticsCollector,
165 ) -> Self {
166 let registry = HatRegistry::from_config(&config);
167 let instruction_builder =
168 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
169
170 let mut bus = EventBus::new();
171
172 for hat in registry.all() {
176 bus.register(hat.clone());
177 }
178
179 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
183
184 if registry.is_empty() {
185 debug!("Solo mode: Ralph is the only coordinator");
186 } else {
187 debug!(
188 "Multi-hat mode: {} custom hats + Ralph as fallback",
189 registry.len()
190 );
191 }
192
193 let skill_registry = if config.skills.enabled {
195 SkillRegistry::from_config(
196 &config.skills,
197 context.workspace(),
198 Some(config.cli.backend.as_str()),
199 )
200 .unwrap_or_else(|e| {
201 warn!(
202 "Failed to build skill registry: {}, using empty registry",
203 e
204 );
205 SkillRegistry::new(Some(config.cli.backend.as_str()))
206 })
207 } else {
208 SkillRegistry::new(Some(config.cli.backend.as_str()))
209 };
210
211 let skill_index = if config.skills.enabled {
212 skill_registry.build_index(None)
213 } else {
214 String::new()
215 };
216
217 let ralph = HatlessRalph::new(
219 config.event_loop.completion_promise.clone(),
220 config.core.clone(),
221 ®istry,
222 config.event_loop.starting_event.clone(),
223 )
224 .with_memories_enabled(config.memories.enabled)
225 .with_skill_index(skill_index);
226
227 let events_path = std::fs::read_to_string(context.current_events_marker())
231 .map(|s| {
232 let relative = s.trim();
233 context.workspace().join(relative)
234 })
235 .unwrap_or_else(|_| context.events_path());
236 let event_reader = EventReader::new(&events_path);
237
238 Self {
239 config,
240 registry,
241 bus,
242 state: LoopState::new(),
243 instruction_builder,
244 ralph,
245 robot_guidance: Vec::new(),
246 event_reader,
247 diagnostics,
248 loop_context: Some(context),
249 skill_registry,
250 robot_service: None,
251 }
252 }
253
254 pub fn with_diagnostics(
256 config: RalphConfig,
257 diagnostics: crate::diagnostics::DiagnosticsCollector,
258 ) -> Self {
259 let registry = HatRegistry::from_config(&config);
260 let instruction_builder =
261 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
262
263 let mut bus = EventBus::new();
264
265 for hat in registry.all() {
269 bus.register(hat.clone());
270 }
271
272 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
276
277 if registry.is_empty() {
278 debug!("Solo mode: Ralph is the only coordinator");
279 } else {
280 debug!(
281 "Multi-hat mode: {} custom hats + Ralph as fallback",
282 registry.len()
283 );
284 }
285
286 let workspace_root = std::path::Path::new(".");
288 let skill_registry = if config.skills.enabled {
289 SkillRegistry::from_config(
290 &config.skills,
291 workspace_root,
292 Some(config.cli.backend.as_str()),
293 )
294 .unwrap_or_else(|e| {
295 warn!(
296 "Failed to build skill registry: {}, using empty registry",
297 e
298 );
299 SkillRegistry::new(Some(config.cli.backend.as_str()))
300 })
301 } else {
302 SkillRegistry::new(Some(config.cli.backend.as_str()))
303 };
304
305 let skill_index = if config.skills.enabled {
306 skill_registry.build_index(None)
307 } else {
308 String::new()
309 };
310
311 let ralph = HatlessRalph::new(
313 config.event_loop.completion_promise.clone(),
314 config.core.clone(),
315 ®istry,
316 config.event_loop.starting_event.clone(),
317 )
318 .with_memories_enabled(config.memories.enabled)
319 .with_skill_index(skill_index);
320
321 let events_path = std::fs::read_to_string(".ralph/current-events")
324 .map(|s| s.trim().to_string())
325 .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
326 let event_reader = EventReader::new(&events_path);
327
328 Self {
329 config,
330 registry,
331 bus,
332 state: LoopState::new(),
333 instruction_builder,
334 ralph,
335 robot_guidance: Vec::new(),
336 event_reader,
337 diagnostics,
338 loop_context: None,
339 skill_registry,
340 robot_service: None,
341 }
342 }
343
344 pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
352 self.robot_service = Some(service);
353 }
354
355 pub fn loop_context(&self) -> Option<&LoopContext> {
357 self.loop_context.as_ref()
358 }
359
360 fn tasks_path(&self) -> PathBuf {
362 self.loop_context
363 .as_ref()
364 .map(|ctx| ctx.tasks_path())
365 .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
366 }
367
368 fn scratchpad_path(&self) -> PathBuf {
370 self.loop_context
371 .as_ref()
372 .map(|ctx| ctx.scratchpad_path())
373 .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
374 }
375
376 pub fn state(&self) -> &LoopState {
378 &self.state
379 }
380
381 pub fn config(&self) -> &RalphConfig {
383 &self.config
384 }
385
386 pub fn registry(&self) -> &HatRegistry {
388 &self.registry
389 }
390
391 pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
396 self.registry
397 .get_config(hat_id)
398 .and_then(|config| config.backend.as_ref())
399 }
400
401 pub fn add_observer<F>(&mut self, observer: F)
406 where
407 F: Fn(&Event) + Send + 'static,
408 {
409 self.bus.add_observer(observer);
410 }
411
412 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
416 pub fn set_observer<F>(&mut self, observer: F)
417 where
418 F: Fn(&Event) + Send + 'static,
419 {
420 #[allow(deprecated)]
421 self.bus.set_observer(observer);
422 }
423
424 pub fn check_termination(&self) -> Option<TerminationReason> {
426 let cfg = &self.config.event_loop;
427
428 if self.state.iteration >= cfg.max_iterations {
429 return Some(TerminationReason::MaxIterations);
430 }
431
432 if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
433 return Some(TerminationReason::MaxRuntime);
434 }
435
436 if let Some(max_cost) = cfg.max_cost_usd
437 && self.state.cumulative_cost >= max_cost
438 {
439 return Some(TerminationReason::MaxCost);
440 }
441
442 if self.state.consecutive_failures >= cfg.max_consecutive_failures {
443 return Some(TerminationReason::ConsecutiveFailures);
444 }
445
446 if self.state.abandoned_task_redispatches >= 3 {
448 return Some(TerminationReason::LoopThrashing);
449 }
450
451 if self.state.consecutive_malformed_events >= 3 {
453 return Some(TerminationReason::ValidationFailure);
454 }
455
456 let stop_path =
458 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
459 if stop_path.exists() {
460 let _ = std::fs::remove_file(&stop_path);
461 return Some(TerminationReason::Stopped);
462 }
463
464 let restart_path =
466 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
467 if restart_path.exists() {
468 return Some(TerminationReason::RestartRequested);
469 }
470
471 None
472 }
473
474 pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
478 if !self.state.completion_requested {
479 return None;
480 }
481
482 self.state.completion_requested = false;
483
484 if self.config.event_loop.persistent {
486 info!("Completion event suppressed - persistent mode active, loop staying alive");
487
488 self.diagnostics.log_orchestration(
489 self.state.iteration,
490 "loop",
491 crate::diagnostics::OrchestrationEvent::LoopTerminated {
492 reason: "completion_event_suppressed_persistent".to_string(),
493 },
494 );
495
496 let resume_event = Event::new(
498 "task.resume",
499 "Persistent mode: loop staying alive after completion signal. \
500 Check for new tasks or await human guidance.",
501 );
502 self.bus.publish(resume_event);
503
504 return None;
505 }
506
507 if self.config.memories.enabled {
509 if let Ok(false) = self.verify_tasks_complete() {
510 let open_tasks = self.get_open_task_list();
511 warn!(
512 open_tasks = ?open_tasks,
513 "Completion event with {} open task(s) - trusting agent decision",
514 open_tasks.len()
515 );
516 }
517 } else if let Ok(false) = self.verify_scratchpad_complete() {
518 warn!("Completion event with pending scratchpad tasks - trusting agent decision");
519 }
520
521 info!("Completion event detected - terminating");
522
523 self.diagnostics.log_orchestration(
525 self.state.iteration,
526 "loop",
527 crate::diagnostics::OrchestrationEvent::LoopTerminated {
528 reason: "completion_event".to_string(),
529 },
530 );
531
532 Some(TerminationReason::CompletionPromise)
533 }
534
535 pub fn initialize(&mut self, prompt_content: &str) {
537 let topic = self
539 .config
540 .event_loop
541 .starting_event
542 .clone()
543 .unwrap_or_else(|| "task.start".to_string());
544 self.initialize_with_topic(&topic, prompt_content);
545 }
546
547 pub fn initialize_resume(&mut self, prompt_content: &str) {
552 self.initialize_with_topic("task.resume", prompt_content);
554 }
555
556 fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
558 self.ralph.set_objective(prompt_content.to_string());
562
563 let start_event = Event::new(topic, prompt_content);
564 self.bus.publish(start_event);
565 debug!(topic = topic, "Published {} event", topic);
566 }
567
568 pub fn next_hat(&self) -> Option<&HatId> {
577 let next = self.bus.next_hat_with_pending();
578
579 if next.is_none() && self.bus.has_human_pending() {
581 return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
582 }
583
584 next.as_ref()?;
586
587 if self.registry.is_empty() {
590 next
592 } else {
593 self.bus.hat_ids().find(|id| id.as_str() == "ralph")
596 }
597 }
598
599 pub fn has_pending_events(&self) -> bool {
604 self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
605 }
606
607 pub fn has_pending_human_events(&self) -> bool {
612 self.bus.has_human_pending()
613 }
614
615 pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
619 self.registry
620 .get(hat_id)
621 .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
622 .unwrap_or_default()
623 }
624
625 pub fn inject_fallback_event(&mut self) -> bool {
632 let fallback_event = Event::new(
633 "task.resume",
634 "RECOVERY: Previous iteration did not publish an event. \
635 Review the scratchpad and either dispatch the next task or complete the loop.",
636 );
637
638 let fallback_event = match &self.state.last_hat {
641 Some(hat_id) if hat_id.as_str() != "ralph" => {
642 debug!(
643 hat = %hat_id.as_str(),
644 "Injecting fallback event to recover - targeting last hat with task.resume"
645 );
646 fallback_event.with_target(hat_id.clone())
647 }
648 _ => {
649 debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
650 fallback_event
651 }
652 };
653
654 self.bus.publish(fallback_event);
655 true
656 }
657
658 pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
672 if hat_id.as_str() == "ralph" {
675 if self.registry.is_empty() {
676 let mut events = self.bus.take_pending(&hat_id.clone());
678 let mut human_events = self.bus.take_human_pending();
679 events.append(&mut human_events);
680
681 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
683 .into_iter()
684 .partition(|e| e.topic.as_str() == "human.guidance");
685
686 let events_context = regular_events
687 .iter()
688 .map(|e| Self::format_event(e))
689 .collect::<Vec<_>>()
690 .join("\n");
691
692 self.update_robot_guidance(guidance_events);
694 self.apply_robot_guidance();
695
696 let base_prompt = self.ralph.build_prompt(&events_context, &[]);
698 self.ralph.clear_robot_guidance();
699 let with_skills = self.prepend_auto_inject_skills(base_prompt);
700 let with_scratchpad = self.prepend_scratchpad(with_skills);
701 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
702
703 debug!("build_prompt: routing to HatlessRalph (solo mode)");
704 return Some(final_prompt);
705 } else {
706 let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
708 all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
710
711 let mut all_events = Vec::new();
712 let mut system_events = Vec::new();
713
714 for id in &all_hat_ids {
715 let pending = self.bus.take_pending(id);
716 if pending.is_empty() {
717 continue;
718 }
719
720 let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
721 if drop_pending {
722 if let Some(exhausted_event) = exhausted_event {
724 all_events.push(exhausted_event.clone());
725 system_events.push(exhausted_event);
726 }
727 continue;
728 }
729
730 all_events.extend(pending);
731 }
732
733 let mut human_events = self.bus.take_human_pending();
734 all_events.append(&mut human_events);
735
736 for event in system_events {
739 self.bus.publish(event);
740 }
741
742 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
744 .into_iter()
745 .partition(|e| e.topic.as_str() == "human.guidance");
746
747 self.update_robot_guidance(guidance_events);
750 self.apply_robot_guidance();
751
752 let active_hat_ids = self.determine_active_hat_ids(®ular_events);
754 self.record_hat_activations(&active_hat_ids);
755 let active_hats = self.determine_active_hats(®ular_events);
756
757 let events_context = regular_events
759 .iter()
760 .map(|e| Self::format_event(e))
761 .collect::<Vec<_>>()
762 .join("\n");
763
764 let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
766
767 debug!(
769 "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
770 active_hats
771 .iter()
772 .map(|h| h.id.as_str())
773 .collect::<Vec<_>>()
774 );
775
776 self.ralph.clear_robot_guidance();
778 let with_skills = self.prepend_auto_inject_skills(base_prompt);
779 let with_scratchpad = self.prepend_scratchpad(with_skills);
780 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
781
782 return Some(final_prompt);
783 }
784 }
785
786 let events = self.bus.take_pending(&hat_id.clone());
790 let events_context = events
791 .iter()
792 .map(|e| Self::format_event(e))
793 .collect::<Vec<_>>()
794 .join("\n");
795
796 let hat = self.registry.get(hat_id)?;
797
798 debug!(
800 "build_prompt: hat_id='{}', instructions.is_empty()={}",
801 hat_id.as_str(),
802 hat.instructions.is_empty()
803 );
804
805 debug!(
807 "build_prompt: routing to build_custom_hat() for '{}'",
808 hat_id.as_str()
809 );
810 Some(
811 self.instruction_builder
812 .build_custom_hat(hat, &events_context),
813 )
814 }
815
816 fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
822 if guidance_events.is_empty() {
823 return;
824 }
825
826 self.persist_guidance_to_scratchpad(&guidance_events);
828
829 self.robot_guidance
830 .extend(guidance_events.into_iter().map(|e| e.payload));
831 }
832
833 fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
838 use std::io::Write;
839
840 let scratchpad_path = self.scratchpad_path();
841 let resolved_path = if scratchpad_path.is_relative() {
842 self.config.core.workspace_root.join(&scratchpad_path)
843 } else {
844 scratchpad_path
845 };
846
847 if let Some(parent) = resolved_path.parent()
849 && !parent.exists()
850 && let Err(e) = std::fs::create_dir_all(parent)
851 {
852 warn!("Failed to create scratchpad directory: {}", e);
853 return;
854 }
855
856 let mut file = match std::fs::OpenOptions::new()
857 .create(true)
858 .append(true)
859 .open(&resolved_path)
860 {
861 Ok(f) => f,
862 Err(e) => {
863 warn!("Failed to open scratchpad for guidance persistence: {}", e);
864 return;
865 }
866 };
867
868 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
869 for event in guidance_events {
870 let entry = format!(
871 "\n### HUMAN GUIDANCE ({})\n\n{}\n",
872 timestamp, event.payload
873 );
874 if let Err(e) = file.write_all(entry.as_bytes()) {
875 warn!("Failed to write guidance to scratchpad: {}", e);
876 }
877 }
878
879 info!(
880 count = guidance_events.len(),
881 "Persisted human guidance to scratchpad"
882 );
883 }
884
885 fn apply_robot_guidance(&mut self) {
887 if self.robot_guidance.is_empty() {
888 return;
889 }
890
891 self.ralph.set_robot_guidance(self.robot_guidance.clone());
892 }
893
894 fn prepend_auto_inject_skills(&self, prompt: String) -> String {
904 let mut prefix = String::new();
905
906 self.inject_memories_and_tools_skill(&mut prefix);
908
909 self.inject_robot_skill(&mut prefix);
911
912 self.inject_custom_auto_skills(&mut prefix);
914
915 if prefix.is_empty() {
916 return prompt;
917 }
918
919 prefix.push_str("\n\n");
920 prefix.push_str(&prompt);
921 prefix
922 }
923
924 fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
932 let memories_config = &self.config.memories;
933
934 if memories_config.enabled && memories_config.inject == InjectMode::Auto {
936 info!(
937 "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
938 memories_config.enabled, memories_config.inject, self.config.core.workspace_root
939 );
940
941 let workspace_root = &self.config.core.workspace_root;
942 let store = MarkdownMemoryStore::with_default_path(workspace_root);
943 let memories_path = workspace_root.join(".ralph/agent/memories.md");
944
945 info!(
946 "Looking for memories at: {:?} (exists: {})",
947 memories_path,
948 memories_path.exists()
949 );
950
951 let memories = match store.load() {
952 Ok(memories) => {
953 info!("Successfully loaded {} memories from store", memories.len());
954 memories
955 }
956 Err(e) => {
957 info!(
958 "Failed to load memories for injection: {} (path: {:?})",
959 e, memories_path
960 );
961 Vec::new()
962 }
963 };
964
965 if memories.is_empty() {
966 info!("Memory store is empty - no memories to inject");
967 } else {
968 let mut memories_content = format_memories_as_markdown(&memories);
969
970 if memories_config.budget > 0 {
971 let original_len = memories_content.len();
972 memories_content =
973 truncate_to_budget(&memories_content, memories_config.budget);
974 debug!(
975 "Applied budget: {} chars -> {} chars (budget: {})",
976 original_len,
977 memories_content.len(),
978 memories_config.budget
979 );
980 }
981
982 info!(
983 "Injecting {} memories ({} chars) into prompt",
984 memories.len(),
985 memories_content.len()
986 );
987
988 prefix.push_str(&memories_content);
989 }
990 }
991
992 if memories_config.enabled || self.config.tasks.enabled {
994 if let Some(skill) = self.skill_registry.get("ralph-tools") {
995 if !prefix.is_empty() {
996 prefix.push_str("\n\n");
997 }
998 prefix.push_str(&format!(
999 "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1000 skill.content.trim()
1001 ));
1002 debug!("Injected ralph-tools skill from registry");
1003 } else {
1004 debug!("ralph-tools skill not found in registry - skill content not injected");
1005 }
1006 }
1007 }
1008
1009 fn inject_robot_skill(&self, prefix: &mut String) {
1014 if !self.config.robot.enabled {
1015 return;
1016 }
1017
1018 if let Some(skill) = self.skill_registry.get("robot-interaction") {
1019 if !prefix.is_empty() {
1020 prefix.push_str("\n\n");
1021 }
1022 prefix.push_str(&format!(
1023 "<robot-skill>\n{}\n</robot-skill>",
1024 skill.content.trim()
1025 ));
1026 debug!("Injected robot interaction skill from registry");
1027 }
1028 }
1029
1030 fn inject_custom_auto_skills(&self, prefix: &mut String) {
1032 for skill in self.skill_registry.auto_inject_skills(None) {
1033 if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1035 continue;
1036 }
1037
1038 if !prefix.is_empty() {
1039 prefix.push_str("\n\n");
1040 }
1041 prefix.push_str(&format!(
1042 "<{name}-skill>\n{content}\n</{name}-skill>",
1043 name = skill.name,
1044 content = skill.content.trim()
1045 ));
1046 debug!("Injected auto-inject skill: {}", skill.name);
1047 }
1048 }
1049
1050 fn prepend_scratchpad(&self, prompt: String) -> String {
1056 let scratchpad_path = self.scratchpad_path();
1057
1058 let resolved_path = if scratchpad_path.is_relative() {
1059 self.config.core.workspace_root.join(&scratchpad_path)
1060 } else {
1061 scratchpad_path
1062 };
1063
1064 if !resolved_path.exists() {
1065 debug!(
1066 "Scratchpad not found at {:?}, skipping injection",
1067 resolved_path
1068 );
1069 return prompt;
1070 }
1071
1072 let content = match std::fs::read_to_string(&resolved_path) {
1073 Ok(c) => c,
1074 Err(e) => {
1075 info!("Failed to read scratchpad for injection: {}", e);
1076 return prompt;
1077 }
1078 };
1079
1080 if content.trim().is_empty() {
1081 debug!("Scratchpad is empty, skipping injection");
1082 return prompt;
1083 }
1084
1085 let char_budget = 4000 * 4;
1087 let content = if content.len() > char_budget {
1088 let start = content.len() - char_budget;
1090 let start = floor_char_boundary(&content, start);
1092 let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1093 let discarded = &content[..line_start];
1094
1095 let headings: Vec<&str> = discarded
1097 .lines()
1098 .filter(|line| line.starts_with('#'))
1099 .collect();
1100 let summary = if headings.is_empty() {
1101 format!(
1102 "<!-- earlier content truncated ({} chars omitted) -->",
1103 line_start
1104 )
1105 } else {
1106 format!(
1107 "<!-- earlier content truncated ({} chars omitted) -->\n\
1108 <!-- discarded sections: {} -->",
1109 line_start,
1110 headings.join(" | ")
1111 )
1112 };
1113
1114 format!("{}\n\n{}", summary, &content[line_start..])
1115 } else {
1116 content
1117 };
1118
1119 info!("Injecting scratchpad ({} chars) into prompt", content.len());
1120
1121 let mut final_prompt = format!(
1122 "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1123 self.config.core.scratchpad, content
1124 );
1125 final_prompt.push_str(&prompt);
1126 final_prompt
1127 }
1128
1129 fn prepend_ready_tasks(&self, prompt: String) -> String {
1135 if !self.config.tasks.enabled {
1136 return prompt;
1137 }
1138
1139 use crate::task::TaskStatus;
1140 use crate::task_store::TaskStore;
1141
1142 let tasks_path = self.tasks_path();
1143 let resolved_path = if tasks_path.is_relative() {
1144 self.config.core.workspace_root.join(&tasks_path)
1145 } else {
1146 tasks_path
1147 };
1148
1149 if !resolved_path.exists() {
1150 return prompt;
1151 }
1152
1153 let store = match TaskStore::load(&resolved_path) {
1154 Ok(s) => s,
1155 Err(e) => {
1156 info!("Failed to load task store for injection: {}", e);
1157 return prompt;
1158 }
1159 };
1160
1161 let ready = store.ready();
1162 let open = store.open();
1163 let closed_count = store.all().len() - open.len();
1164
1165 if open.is_empty() && closed_count == 0 {
1166 return prompt;
1167 }
1168
1169 let mut section = String::from("<ready-tasks>\n");
1170 if ready.is_empty() && open.is_empty() {
1171 section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1172 } else {
1173 section.push_str(&format!(
1174 "## Tasks: {} ready, {} open, {} closed\n\n",
1175 ready.len(),
1176 open.len(),
1177 closed_count
1178 ));
1179 for task in &ready {
1180 let status_icon = match task.status {
1181 TaskStatus::Open => "[ ]",
1182 TaskStatus::InProgress => "[~]",
1183 _ => "[?]",
1184 };
1185 section.push_str(&format!(
1186 "- {} [P{}] {} ({})\n",
1187 status_icon, task.priority, task.title, task.id
1188 ));
1189 }
1190 let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1192 let blocked: Vec<_> = open
1193 .iter()
1194 .filter(|t| !ready_ids.contains(&t.id.as_str()))
1195 .collect();
1196 if !blocked.is_empty() {
1197 section.push_str("\nBlocked:\n");
1198 for task in blocked {
1199 section.push_str(&format!(
1200 "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1201 task.priority,
1202 task.title,
1203 task.id,
1204 task.blocked_by.join(", ")
1205 ));
1206 }
1207 }
1208 }
1209 section.push_str("</ready-tasks>\n\n");
1210
1211 info!(
1212 "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1213 ready.len(),
1214 open.len(),
1215 closed_count
1216 );
1217
1218 let mut final_prompt = section;
1219 final_prompt.push_str(&prompt);
1220 final_prompt
1221 }
1222
1223 pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1225 self.ralph.build_prompt(prompt_content, &[])
1226 }
1227
1228 fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1231 let mut active_hats = Vec::new();
1232 for id in self.determine_active_hat_ids(events) {
1233 if let Some(hat) = self.registry.get(&id) {
1234 active_hats.push(hat);
1235 }
1236 }
1237 active_hats
1238 }
1239
1240 fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1241 let mut active_hat_ids = Vec::new();
1242 for event in events {
1243 if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1244 if !active_hat_ids.iter().any(|id| id == &hat.id) {
1246 active_hat_ids.push(hat.id.clone());
1247 }
1248 }
1249 }
1250 active_hat_ids
1251 }
1252
1253 fn format_event(event: &Event) -> String {
1258 let topic = &event.topic;
1259 let payload = &event.payload;
1260
1261 if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1262 format!(
1263 "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1264 topic, payload
1265 )
1266 } else {
1267 format!("Event: {} - {}", topic, payload)
1268 }
1269 }
1270
1271 fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1272 let Some(config) = self.registry.get_config(hat_id) else {
1273 return (false, None);
1274 };
1275 let Some(max) = config.max_activations else {
1276 return (false, None);
1277 };
1278
1279 let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1280 if count < max {
1281 return (false, None);
1282 }
1283
1284 let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1286
1287 if !should_emit {
1288 return (true, None);
1290 }
1291
1292 let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1293 dropped_topics.sort();
1294
1295 let payload = format!(
1296 "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n - {topics}",
1297 hat = hat_id.as_str(),
1298 max = max,
1299 count = count,
1300 topics = dropped_topics.join("\n - ")
1301 );
1302
1303 warn!(
1304 hat = %hat_id.as_str(),
1305 max_activations = max,
1306 activations = count,
1307 "Hat exhausted (max_activations reached)"
1308 );
1309
1310 (
1311 true,
1312 Some(Event::new(
1313 format!("{}.exhausted", hat_id.as_str()),
1314 payload,
1315 )),
1316 )
1317 }
1318
1319 fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1320 for hat_id in active_hat_ids {
1321 *self
1322 .state
1323 .hat_activation_counts
1324 .entry(hat_id.clone())
1325 .or_insert(0) += 1;
1326 }
1327 }
1328
1329 pub fn get_active_hat_id(&self) -> HatId {
1332 for hat_id in self.bus.hat_ids() {
1334 let Some(events) = self.bus.peek_pending(hat_id) else {
1335 continue;
1336 };
1337 let Some(event) = events.first() else {
1338 continue;
1339 };
1340 if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1341 return active_hat.id.clone();
1342 }
1343 }
1344 HatId::new("ralph")
1345 }
1346
1347 pub fn record_event_count(&mut self) -> usize {
1352 self.event_reader
1353 .read_new_events()
1354 .map(|r| r.events.len())
1355 .unwrap_or(0)
1356 }
1357
1358 pub fn check_default_publishes(&mut self, hat_id: &HatId, _events_before: usize) {
1364 let events_after = self
1365 .event_reader
1366 .read_new_events()
1367 .map(|r| r.events.len())
1368 .unwrap_or(0);
1369
1370 if events_after == 0
1371 && let Some(config) = self.registry.get_config(hat_id)
1372 && let Some(default_topic) = &config.default_publishes
1373 {
1374 let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1376
1377 debug!(
1378 hat = %hat_id.as_str(),
1379 topic = %default_topic,
1380 "No events written by hat, injecting default_publishes event"
1381 );
1382
1383 self.bus.publish(default_event);
1384 }
1385 }
1386
1387 pub fn bus(&mut self) -> &mut EventBus {
1392 &mut self.bus
1393 }
1394
1395 pub fn process_output(
1399 &mut self,
1400 hat_id: &HatId,
1401 output: &str,
1402 success: bool,
1403 ) -> Option<TerminationReason> {
1404 self.state.iteration += 1;
1405 self.state.last_hat = Some(hat_id.clone());
1406
1407 if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1409 && let Some(ref robot_service) = self.robot_service
1410 {
1411 let elapsed = self.state.elapsed();
1412 let interval = std::time::Duration::from_secs(interval_secs);
1413 let last = self
1414 .state
1415 .last_checkin_at
1416 .map(|t| t.elapsed())
1417 .unwrap_or(elapsed);
1418
1419 if last >= interval {
1420 let context = self.build_checkin_context(hat_id);
1421 match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1422 Ok(_) => {
1423 self.state.last_checkin_at = Some(std::time::Instant::now());
1424 debug!(iteration = self.state.iteration, "Sent robot check-in");
1425 }
1426 Err(e) => {
1427 warn!(error = %e, "Failed to send robot check-in");
1428 }
1429 }
1430 }
1431 }
1432
1433 self.diagnostics.log_orchestration(
1435 self.state.iteration,
1436 "loop",
1437 crate::diagnostics::OrchestrationEvent::IterationStarted,
1438 );
1439
1440 self.diagnostics.log_orchestration(
1442 self.state.iteration,
1443 "loop",
1444 crate::diagnostics::OrchestrationEvent::HatSelected {
1445 hat: hat_id.to_string(),
1446 reason: "process_output".to_string(),
1447 },
1448 );
1449
1450 if success {
1452 self.state.consecutive_failures = 0;
1453 } else {
1454 self.state.consecutive_failures += 1;
1455 }
1456
1457 let _ = output;
1458
1459 self.check_termination()
1465 }
1466
1467 fn extract_task_id(payload: &str) -> String {
1470 payload
1471 .lines()
1472 .next()
1473 .unwrap_or("unknown")
1474 .trim()
1475 .to_string()
1476 }
1477
1478 pub fn add_cost(&mut self, cost: f64) {
1480 self.state.cumulative_cost += cost;
1481 }
1482
1483 fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1490 let scratchpad_path = self.scratchpad_path();
1491
1492 if !scratchpad_path.exists() {
1493 return Err(std::io::Error::new(
1494 std::io::ErrorKind::NotFound,
1495 "Scratchpad does not exist",
1496 ));
1497 }
1498
1499 let content = std::fs::read_to_string(scratchpad_path)?;
1500
1501 let has_pending = content
1502 .lines()
1503 .any(|line| line.trim_start().starts_with("- [ ]"));
1504
1505 Ok(!has_pending)
1506 }
1507
1508 fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1509 use crate::task_store::TaskStore;
1510
1511 let tasks_path = self.tasks_path();
1512
1513 if !tasks_path.exists() {
1515 return Ok(true);
1516 }
1517
1518 let store = TaskStore::load(&tasks_path)?;
1519 Ok(!store.has_pending_tasks())
1520 }
1521
1522 fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
1524 let (open_tasks, closed_tasks) = self.count_tasks();
1525 CheckinContext {
1526 current_hat: Some(hat_id.as_str().to_string()),
1527 open_tasks,
1528 closed_tasks,
1529 cumulative_cost: self.state.cumulative_cost,
1530 }
1531 }
1532
1533 fn count_tasks(&self) -> (usize, usize) {
1538 use crate::task::TaskStatus;
1539 use crate::task_store::TaskStore;
1540
1541 let tasks_path = self.tasks_path();
1542 if !tasks_path.exists() {
1543 return (0, 0);
1544 }
1545
1546 match TaskStore::load(&tasks_path) {
1547 Ok(store) => {
1548 let total = store.all().len();
1549 let open = store.open().len();
1550 let closed = total - open;
1551 debug_assert_eq!(
1553 closed,
1554 store
1555 .all()
1556 .iter()
1557 .filter(|t| t.status == TaskStatus::Closed)
1558 .count()
1559 );
1560 (open, closed)
1561 }
1562 Err(_) => (0, 0),
1563 }
1564 }
1565
1566 fn get_open_task_list(&self) -> Vec<String> {
1568 use crate::task_store::TaskStore;
1569
1570 let tasks_path = self.tasks_path();
1571 if let Ok(store) = TaskStore::load(&tasks_path) {
1572 return store
1573 .open()
1574 .iter()
1575 .map(|t| format!("{}: {}", t.id, t.title))
1576 .collect();
1577 }
1578 vec![]
1579 }
1580
1581 fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
1582 let threshold = self.config.event_loop.mutation_score_warn_threshold;
1583
1584 match &evidence.mutants {
1585 Some(mutants) => {
1586 if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
1587 warn!(
1588 reason = %reason,
1589 mutants_status = ?mutants.status,
1590 mutants_score = mutants.score_percent,
1591 mutants_threshold = threshold,
1592 "Mutation testing warning"
1593 );
1594 }
1595 }
1596 None => {
1597 if let Some(threshold) = threshold {
1598 warn!(
1599 mutants_threshold = threshold,
1600 "Mutation testing warning: missing mutation evidence in build.done payload"
1601 );
1602 }
1603 }
1604 }
1605 }
1606
1607 fn mutation_warning_reason(
1608 mutants: &MutationEvidence,
1609 threshold: Option<f64>,
1610 ) -> Option<String> {
1611 match mutants.status {
1612 MutationStatus::Fail => Some("mutation testing failed".to_string()),
1613 MutationStatus::Warn => Some(Self::format_mutation_message(
1614 "mutation score below threshold",
1615 mutants.score_percent,
1616 )),
1617 MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
1618 MutationStatus::Pass => {
1619 let threshold = threshold?;
1620
1621 match mutants.score_percent {
1622 Some(score) if score < threshold => Some(format!(
1623 "mutation score {:.2}% below threshold {:.2}%",
1624 score, threshold
1625 )),
1626 Some(_) => None,
1627 None => Some(format!(
1628 "mutation score missing (threshold {:.2}%)",
1629 threshold
1630 )),
1631 }
1632 }
1633 }
1634 }
1635
1636 fn format_mutation_message(message: &str, score: Option<f64>) -> String {
1637 match score {
1638 Some(score) => format!("{message} ({score:.2}%)"),
1639 None => message.to_string(),
1640 }
1641 }
1642
1643 pub fn process_events_from_jsonl(&mut self) -> std::io::Result<bool> {
1652 let result = self.event_reader.read_new_events()?;
1653
1654 for malformed in &result.malformed {
1656 let payload = format!(
1657 "Line {}: {}\nContent: {}",
1658 malformed.line_number, malformed.error, &malformed.content
1659 );
1660 let event = Event::new("event.malformed", &payload);
1661 self.bus.publish(event);
1662 self.state.consecutive_malformed_events += 1;
1663 warn!(
1664 line = malformed.line_number,
1665 consecutive = self.state.consecutive_malformed_events,
1666 "Malformed event line detected"
1667 );
1668 }
1669
1670 if !result.events.is_empty() {
1672 self.state.consecutive_malformed_events = 0;
1673 }
1674
1675 if result.events.is_empty() && result.malformed.is_empty() {
1676 return Ok(false);
1677 }
1678
1679 let mut has_orphans = false;
1680
1681 let mut validated_events = Vec::new();
1683 let completion_topic = self.config.event_loop.completion_promise.as_str();
1684 let total_events = result.events.len();
1685 for (index, event) in result.events.into_iter().enumerate() {
1686 let payload = event.payload.clone().unwrap_or_default();
1687
1688 if event.topic == completion_topic {
1689 if index + 1 == total_events {
1690 self.state.completion_requested = true;
1691 self.diagnostics.log_orchestration(
1692 self.state.iteration,
1693 "jsonl",
1694 crate::diagnostics::OrchestrationEvent::EventPublished {
1695 topic: event.topic.clone(),
1696 },
1697 );
1698 info!(
1699 topic = %event.topic,
1700 "Completion event detected in JSONL"
1701 );
1702 } else {
1703 warn!(
1704 topic = %event.topic,
1705 index = index,
1706 total_events = total_events,
1707 "Completion event ignored because it was not the last event"
1708 );
1709 }
1710 continue;
1711 }
1712
1713 if event.topic == "build.done" {
1714 if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1716 if evidence.all_passed() {
1717 self.warn_on_mutation_evidence(&evidence);
1718 validated_events.push(Event::new(event.topic.as_str(), &payload));
1719 } else {
1720 warn!(
1722 tests = evidence.tests_passed,
1723 lint = evidence.lint_passed,
1724 typecheck = evidence.typecheck_passed,
1725 audit = evidence.audit_passed,
1726 coverage = evidence.coverage_passed,
1727 complexity = evidence.complexity_score,
1728 duplication = evidence.duplication_passed,
1729 performance = evidence.performance_regression,
1730 specs = evidence.specs_verified,
1731 "build.done rejected: backpressure checks failed"
1732 );
1733
1734 let complexity = evidence
1735 .complexity_score
1736 .map(|value| format!("{value:.2}"))
1737 .unwrap_or_else(|| "missing".to_string());
1738 let performance = match evidence.performance_regression {
1739 Some(true) => "regression".to_string(),
1740 Some(false) => "pass".to_string(),
1741 None => "missing".to_string(),
1742 };
1743 let specs = match evidence.specs_verified {
1744 Some(true) => "pass".to_string(),
1745 Some(false) => "fail".to_string(),
1746 None => "not reported".to_string(),
1747 };
1748
1749 self.diagnostics.log_orchestration(
1750 self.state.iteration,
1751 "jsonl",
1752 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1753 reason: format!(
1754 "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
1755 evidence.tests_passed,
1756 evidence.lint_passed,
1757 evidence.typecheck_passed,
1758 evidence.audit_passed,
1759 evidence.coverage_passed,
1760 complexity,
1761 evidence.duplication_passed,
1762 performance,
1763 specs
1764 ),
1765 },
1766 );
1767
1768 validated_events.push(Event::new(
1769 "build.blocked",
1770 "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
1771 ));
1772 }
1773 } else {
1774 warn!("build.done rejected: missing backpressure evidence");
1776
1777 self.diagnostics.log_orchestration(
1778 self.state.iteration,
1779 "jsonl",
1780 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1781 reason: "missing backpressure evidence".to_string(),
1782 },
1783 );
1784
1785 validated_events.push(Event::new(
1786 "build.blocked",
1787 "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
1788 ));
1789 }
1790 } else if event.topic == "review.done" {
1791 if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
1793 if evidence.is_verified() {
1794 validated_events.push(Event::new(event.topic.as_str(), &payload));
1795 } else {
1796 warn!(
1798 tests = evidence.tests_passed,
1799 build = evidence.build_passed,
1800 "review.done rejected: verification checks failed"
1801 );
1802
1803 self.diagnostics.log_orchestration(
1804 self.state.iteration,
1805 "jsonl",
1806 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1807 reason: format!(
1808 "review verification failed: tests={}, build={}",
1809 evidence.tests_passed, evidence.build_passed
1810 ),
1811 },
1812 );
1813
1814 validated_events.push(Event::new(
1815 "review.blocked",
1816 "Review verification failed. Run tests and build before emitting review.done.",
1817 ));
1818 }
1819 } else {
1820 warn!("review.done rejected: missing verification evidence");
1822
1823 self.diagnostics.log_orchestration(
1824 self.state.iteration,
1825 "jsonl",
1826 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1827 reason: "missing review verification evidence".to_string(),
1828 },
1829 );
1830
1831 validated_events.push(Event::new(
1832 "review.blocked",
1833 "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
1834 ));
1835 }
1836 } else if event.topic == "verify.passed" {
1837 if let Some(report) = EventParser::parse_quality_report(&payload) {
1838 if report.meets_thresholds() {
1839 validated_events.push(Event::new(event.topic.as_str(), &payload));
1840 } else {
1841 let failed = report.failed_dimensions();
1842 let reason = if failed.is_empty() {
1843 "quality thresholds failed".to_string()
1844 } else {
1845 format!("quality thresholds failed: {}", failed.join(", "))
1846 };
1847
1848 warn!(
1849 failed_dimensions = ?failed,
1850 "verify.passed rejected: quality thresholds failed"
1851 );
1852
1853 self.diagnostics.log_orchestration(
1854 self.state.iteration,
1855 "jsonl",
1856 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1857 reason,
1858 },
1859 );
1860
1861 validated_events.push(Event::new(
1862 "verify.failed",
1863 "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
1864 ));
1865 }
1866 } else {
1867 warn!("verify.passed rejected: missing quality report");
1869
1870 self.diagnostics.log_orchestration(
1871 self.state.iteration,
1872 "jsonl",
1873 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1874 reason: "missing quality report".to_string(),
1875 },
1876 );
1877
1878 validated_events.push(Event::new(
1879 "verify.failed",
1880 "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
1881 ));
1882 }
1883 } else if event.topic == "verify.failed" {
1884 if EventParser::parse_quality_report(&payload).is_none() {
1885 warn!("verify.failed missing quality report");
1886 }
1887 validated_events.push(Event::new(event.topic.as_str(), &payload));
1888 } else {
1889 validated_events.push(Event::new(event.topic.as_str(), &payload));
1891 }
1892 }
1893
1894 let blocked_events: Vec<_> = validated_events
1896 .iter()
1897 .filter(|e| e.topic == "build.blocked".into())
1898 .collect();
1899
1900 for blocked_event in &blocked_events {
1901 let task_id = Self::extract_task_id(&blocked_event.payload);
1902
1903 let count = self
1904 .state
1905 .task_block_counts
1906 .entry(task_id.clone())
1907 .or_insert(0);
1908 *count += 1;
1909
1910 debug!(
1911 task_id = %task_id,
1912 block_count = *count,
1913 "Task blocked"
1914 );
1915
1916 if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
1918 warn!(
1919 task_id = %task_id,
1920 "Task abandoned after 3 consecutive blocks"
1921 );
1922
1923 self.state.abandoned_tasks.push(task_id.clone());
1924
1925 self.diagnostics.log_orchestration(
1926 self.state.iteration,
1927 "jsonl",
1928 crate::diagnostics::OrchestrationEvent::TaskAbandoned {
1929 reason: format!(
1930 "3 consecutive build.blocked events for task '{}'",
1931 task_id
1932 ),
1933 },
1934 );
1935
1936 let abandoned_event = Event::new(
1937 "build.task.abandoned",
1938 format!(
1939 "Task '{}' abandoned after 3 consecutive build.blocked events",
1940 task_id
1941 ),
1942 );
1943
1944 self.bus.publish(abandoned_event);
1945 }
1946 }
1947
1948 let has_blocked_event = !blocked_events.is_empty();
1950
1951 if has_blocked_event {
1952 self.state.consecutive_blocked += 1;
1953 } else {
1954 self.state.consecutive_blocked = 0;
1955 self.state.last_blocked_hat = None;
1956 }
1957
1958 let mut response_event = None;
1962 let ask_human_idx = validated_events
1963 .iter()
1964 .position(|e| e.topic == "human.interact".into());
1965
1966 if let Some(idx) = ask_human_idx {
1967 let ask_event = &validated_events[idx];
1968 let payload = ask_event.payload.clone();
1969
1970 if let Some(ref robot_service) = self.robot_service {
1971 info!(
1972 payload = %payload,
1973 "human.interact event detected — sending question via robot service"
1974 );
1975
1976 let send_ok = match robot_service.send_question(&payload) {
1978 Ok(_message_id) => true,
1979 Err(e) => {
1980 warn!(
1981 error = %e,
1982 "Failed to send human.interact question after retries — treating as timeout"
1983 );
1984 self.diagnostics.log_error(
1986 self.state.iteration,
1987 "telegram",
1988 crate::diagnostics::DiagnosticError::TelegramSendError {
1989 operation: "send_question".to_string(),
1990 error: e.to_string(),
1991 retry_count: 3,
1992 },
1993 );
1994 false
1995 }
1996 };
1997
1998 if send_ok {
2001 let events_path = self
2004 .loop_context
2005 .as_ref()
2006 .and_then(|ctx| {
2007 std::fs::read_to_string(ctx.current_events_marker())
2008 .ok()
2009 .map(|s| ctx.workspace().join(s.trim()))
2010 })
2011 .or_else(|| {
2012 std::fs::read_to_string(".ralph/current-events")
2013 .ok()
2014 .map(|s| PathBuf::from(s.trim()))
2015 })
2016 .unwrap_or_else(|| {
2017 self.loop_context
2018 .as_ref()
2019 .map(|ctx| ctx.events_path())
2020 .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2021 });
2022
2023 match robot_service.wait_for_response(&events_path) {
2024 Ok(Some(response)) => {
2025 info!(
2026 response = %response,
2027 "Received human.response — continuing loop"
2028 );
2029 response_event = Some(Event::new("human.response", &response));
2031 }
2032 Ok(None) => {
2033 warn!(
2034 timeout_secs = robot_service.timeout_secs(),
2035 "Human response timeout — continuing without response"
2036 );
2037 }
2038 Err(e) => {
2039 warn!(
2040 error = %e,
2041 "Error waiting for human response — continuing without response"
2042 );
2043 }
2044 }
2045 }
2046 } else {
2047 debug!(
2048 "human.interact event detected but no robot service active — passing through"
2049 );
2050 }
2051 }
2052
2053 for event in validated_events {
2058 self.diagnostics.log_orchestration(
2059 self.state.iteration,
2060 "jsonl",
2061 crate::diagnostics::OrchestrationEvent::EventPublished {
2062 topic: event.topic.to_string(),
2063 },
2064 );
2065
2066 if !self.registry.has_subscriber(event.topic.as_str()) {
2067 has_orphans = true;
2068 }
2069
2070 debug!(
2071 topic = %event.topic,
2072 "Publishing event from JSONL"
2073 );
2074 self.bus.publish(event);
2075 }
2076
2077 if let Some(response) = response_event {
2079 info!(
2080 topic = %response.topic,
2081 "Publishing human.response event from robot service"
2082 );
2083 self.bus.publish(response);
2084 }
2085
2086 Ok(has_orphans)
2087 }
2088
2089 pub fn check_ralph_completion(&self, output: &str) -> bool {
2093 let events = EventParser::new().parse(output);
2094 events
2095 .iter()
2096 .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2097 }
2098
2099 pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2106 self.stop_robot_service();
2108
2109 let elapsed = self.state.elapsed();
2110 let duration_str = format_duration(elapsed);
2111
2112 let payload = format!(
2113 "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2114 reason.as_str(),
2115 termination_status_text(reason),
2116 self.state.iteration,
2117 duration_str,
2118 reason.exit_code()
2119 );
2120
2121 let event = Event::new("loop.terminate", &payload);
2122
2123 self.bus.publish(event.clone());
2125
2126 info!(
2127 reason = %reason.as_str(),
2128 iterations = self.state.iteration,
2129 duration = %duration_str,
2130 "Wrapping up: {}. {} iterations in {}.",
2131 reason.as_str(),
2132 self.state.iteration,
2133 duration_str
2134 );
2135
2136 event
2137 }
2138
2139 pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2144 self.robot_service.as_ref().map(|s| s.shutdown_flag())
2145 }
2146
2147 fn stop_robot_service(&mut self) {
2151 if let Some(service) = self.robot_service.take() {
2152 service.stop();
2153 }
2154 }
2155
2156 pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2164 events
2165 .iter()
2166 .find(|e| e.topic.as_str() == "user.prompt")
2167 .map(|e| UserPrompt {
2168 id: Self::extract_prompt_id(&e.payload),
2169 text: e.payload.clone(),
2170 })
2171 }
2172
2173 fn extract_prompt_id(payload: &str) -> String {
2178 if let Some(start) = payload.find("id=\"")
2180 && let Some(end) = payload[start + 4..].find('"')
2181 {
2182 return payload[start + 4..start + 4 + end].to_string();
2183 }
2184
2185 format!("q{}", Self::generate_prompt_id())
2187 }
2188
2189 fn generate_prompt_id() -> String {
2192 use std::time::{SystemTime, UNIX_EPOCH};
2193 let nanos = SystemTime::now()
2194 .duration_since(UNIX_EPOCH)
2195 .unwrap()
2196 .as_nanos();
2197 format!("{:x}", nanos % 0xFFFF_FFFF)
2198 }
2199}
2200
2201#[derive(Debug, Clone)]
2205pub struct UserPrompt {
2206 pub id: String,
2208 pub text: String,
2210}
2211
2212fn format_duration(d: Duration) -> String {
2214 let total_secs = d.as_secs();
2215 let hours = total_secs / 3600;
2216 let minutes = (total_secs % 3600) / 60;
2217 let seconds = total_secs % 60;
2218
2219 if hours > 0 {
2220 format!("{}h {}m {}s", hours, minutes, seconds)
2221 } else if minutes > 0 {
2222 format!("{}m {}s", minutes, seconds)
2223 } else {
2224 format!("{}s", seconds)
2225 }
2226}
2227
2228fn termination_status_text(reason: &TerminationReason) -> &'static str {
2230 match reason {
2231 TerminationReason::CompletionPromise => "All tasks completed successfully.",
2232 TerminationReason::MaxIterations => "Stopped at iteration limit.",
2233 TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2234 TerminationReason::MaxCost => "Stopped at cost limit.",
2235 TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2236 TerminationReason::LoopThrashing => {
2237 "Loop thrashing detected - same hat repeatedly blocked."
2238 }
2239 TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2240 TerminationReason::Stopped => "Manually stopped.",
2241 TerminationReason::Interrupted => "Interrupted by signal.",
2242 TerminationReason::RestartRequested => "Restarting by human request.",
2243 }
2244}