1mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::EventParser;
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use ralph_proto::{Event, EventBus, Hat, HatId};
21use ralph_telegram::TelegramService;
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::sync::atomic::AtomicBool;
25use std::time::Duration;
26use tracing::{debug, info, warn};
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum TerminationReason {
31 CompletionPromise,
33 MaxIterations,
35 MaxRuntime,
37 MaxCost,
39 ConsecutiveFailures,
41 LoopThrashing,
43 ValidationFailure,
45 Stopped,
47 Interrupted,
49 ChaosModeComplete,
51 ChaosModeMaxIterations,
53 RestartRequested,
55}
56
57impl TerminationReason {
58 pub fn exit_code(&self) -> i32 {
66 match self {
67 TerminationReason::CompletionPromise | TerminationReason::ChaosModeComplete => 0,
68 TerminationReason::ConsecutiveFailures
69 | TerminationReason::LoopThrashing
70 | TerminationReason::ValidationFailure
71 | TerminationReason::Stopped => 1,
72 TerminationReason::MaxIterations
73 | TerminationReason::MaxRuntime
74 | TerminationReason::MaxCost
75 | TerminationReason::ChaosModeMaxIterations => 2,
76 TerminationReason::Interrupted => 130,
77 TerminationReason::RestartRequested => 3,
79 }
80 }
81
82 pub fn as_str(&self) -> &'static str {
87 match self {
88 TerminationReason::CompletionPromise => "completed",
89 TerminationReason::MaxIterations => "max_iterations",
90 TerminationReason::MaxRuntime => "max_runtime",
91 TerminationReason::MaxCost => "max_cost",
92 TerminationReason::ConsecutiveFailures => "consecutive_failures",
93 TerminationReason::LoopThrashing => "loop_thrashing",
94 TerminationReason::ValidationFailure => "validation_failure",
95 TerminationReason::Stopped => "stopped",
96 TerminationReason::Interrupted => "interrupted",
97 TerminationReason::ChaosModeComplete => "chaos_complete",
98 TerminationReason::ChaosModeMaxIterations => "chaos_max_iterations",
99 TerminationReason::RestartRequested => "restart_requested",
100 }
101 }
102
103 pub fn is_success(&self) -> bool {
105 matches!(
106 self,
107 TerminationReason::CompletionPromise | TerminationReason::ChaosModeComplete
108 )
109 }
110
111 pub fn triggers_chaos_mode(&self) -> bool {
115 matches!(self, TerminationReason::CompletionPromise)
116 }
117}
118
119pub struct EventLoop {
121 config: RalphConfig,
122 registry: HatRegistry,
123 bus: EventBus,
124 state: LoopState,
125 instruction_builder: InstructionBuilder,
126 ralph: HatlessRalph,
127 robot_guidance: Vec<String>,
129 pub(crate) event_reader: EventReader,
132 diagnostics: crate::diagnostics::DiagnosticsCollector,
133 loop_context: Option<LoopContext>,
135 skill_registry: SkillRegistry,
137 telegram_service: Option<TelegramService>,
140}
141
142impl EventLoop {
143 pub fn new(config: RalphConfig) -> Self {
145 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
148 .unwrap_or_else(|e| {
149 debug!(
150 "Failed to initialize diagnostics: {}, using disabled collector",
151 e
152 );
153 crate::diagnostics::DiagnosticsCollector::disabled()
154 });
155
156 Self::with_diagnostics(config, diagnostics)
157 }
158
159 pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
165 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
166 .unwrap_or_else(|e| {
167 debug!(
168 "Failed to initialize diagnostics: {}, using disabled collector",
169 e
170 );
171 crate::diagnostics::DiagnosticsCollector::disabled()
172 });
173
174 Self::with_context_and_diagnostics(config, context, diagnostics)
175 }
176
177 pub fn with_context_and_diagnostics(
179 config: RalphConfig,
180 context: LoopContext,
181 diagnostics: crate::diagnostics::DiagnosticsCollector,
182 ) -> Self {
183 let registry = HatRegistry::from_config(&config);
184 let instruction_builder = InstructionBuilder::with_events(
185 &config.event_loop.completion_promise,
186 config.core.clone(),
187 config.events.clone(),
188 );
189
190 let mut bus = EventBus::new();
191
192 for hat in registry.all() {
196 bus.register(hat.clone());
197 }
198
199 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
203
204 if registry.is_empty() {
205 debug!("Solo mode: Ralph is the only coordinator");
206 } else {
207 debug!(
208 "Multi-hat mode: {} custom hats + Ralph as fallback",
209 registry.len()
210 );
211 }
212
213 let skill_registry = if config.skills.enabled {
215 SkillRegistry::from_config(
216 &config.skills,
217 context.workspace(),
218 Some(config.cli.backend.as_str()),
219 )
220 .unwrap_or_else(|e| {
221 warn!(
222 "Failed to build skill registry: {}, using empty registry",
223 e
224 );
225 SkillRegistry::new(Some(config.cli.backend.as_str()))
226 })
227 } else {
228 SkillRegistry::new(Some(config.cli.backend.as_str()))
229 };
230
231 let skill_index = if config.skills.enabled {
232 skill_registry.build_index(None)
233 } else {
234 String::new()
235 };
236
237 let ralph = HatlessRalph::new(
239 config.event_loop.completion_promise.clone(),
240 config.core.clone(),
241 ®istry,
242 config.event_loop.starting_event.clone(),
243 )
244 .with_memories_enabled(config.memories.enabled)
245 .with_skill_index(skill_index);
246
247 let events_path = std::fs::read_to_string(context.current_events_marker())
251 .map(|s| {
252 let relative = s.trim();
253 context.workspace().join(relative)
254 })
255 .unwrap_or_else(|_| context.events_path());
256 let event_reader = EventReader::new(&events_path);
257
258 let telegram_service = Self::create_telegram_service(&config, Some(&context));
261
262 Self {
263 config,
264 registry,
265 bus,
266 state: LoopState::new(),
267 instruction_builder,
268 ralph,
269 robot_guidance: Vec::new(),
270 event_reader,
271 diagnostics,
272 loop_context: Some(context),
273 skill_registry,
274 telegram_service,
275 }
276 }
277
278 pub fn with_diagnostics(
280 config: RalphConfig,
281 diagnostics: crate::diagnostics::DiagnosticsCollector,
282 ) -> Self {
283 let registry = HatRegistry::from_config(&config);
284 let instruction_builder = InstructionBuilder::with_events(
285 &config.event_loop.completion_promise,
286 config.core.clone(),
287 config.events.clone(),
288 );
289
290 let mut bus = EventBus::new();
291
292 for hat in registry.all() {
296 bus.register(hat.clone());
297 }
298
299 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
303
304 if registry.is_empty() {
305 debug!("Solo mode: Ralph is the only coordinator");
306 } else {
307 debug!(
308 "Multi-hat mode: {} custom hats + Ralph as fallback",
309 registry.len()
310 );
311 }
312
313 let workspace_root = std::path::Path::new(".");
315 let skill_registry = if config.skills.enabled {
316 SkillRegistry::from_config(
317 &config.skills,
318 workspace_root,
319 Some(config.cli.backend.as_str()),
320 )
321 .unwrap_or_else(|e| {
322 warn!(
323 "Failed to build skill registry: {}, using empty registry",
324 e
325 );
326 SkillRegistry::new(Some(config.cli.backend.as_str()))
327 })
328 } else {
329 SkillRegistry::new(Some(config.cli.backend.as_str()))
330 };
331
332 let skill_index = if config.skills.enabled {
333 skill_registry.build_index(None)
334 } else {
335 String::new()
336 };
337
338 let ralph = HatlessRalph::new(
340 config.event_loop.completion_promise.clone(),
341 config.core.clone(),
342 ®istry,
343 config.event_loop.starting_event.clone(),
344 )
345 .with_memories_enabled(config.memories.enabled)
346 .with_skill_index(skill_index);
347
348 let events_path = std::fs::read_to_string(".ralph/current-events")
351 .map(|s| s.trim().to_string())
352 .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
353 let event_reader = EventReader::new(&events_path);
354
355 let telegram_service = Self::create_telegram_service(&config, None);
358
359 Self {
360 config,
361 registry,
362 bus,
363 state: LoopState::new(),
364 instruction_builder,
365 ralph,
366 robot_guidance: Vec::new(),
367 event_reader,
368 diagnostics,
369 loop_context: None,
370 skill_registry,
371 telegram_service,
372 }
373 }
374
375 fn create_telegram_service(
384 config: &RalphConfig,
385 context: Option<&LoopContext>,
386 ) -> Option<TelegramService> {
387 if !config.robot.enabled {
388 return None;
389 }
390
391 if let Some(ctx) = context
394 && !ctx.is_primary()
395 {
396 debug!(
397 workspace = %ctx.workspace().display(),
398 "Skipping Telegram service: not the primary loop"
399 );
400 return None;
401 }
402
403 let workspace_root = context
404 .map(|ctx| ctx.workspace().to_path_buf())
405 .unwrap_or_else(|| config.core.workspace_root.clone());
406
407 let bot_token = config.robot.resolve_bot_token();
408 let timeout_secs = config.robot.timeout_seconds.unwrap_or(300);
409 let loop_id = context
410 .and_then(|ctx| ctx.loop_id().map(String::from))
411 .unwrap_or_else(|| "main".to_string());
412
413 match TelegramService::new(workspace_root, bot_token, timeout_secs, loop_id) {
414 Ok(service) => {
415 if let Err(e) = service.start() {
416 warn!(error = %e, "Failed to start Telegram service");
417 return None;
418 }
419 info!(
420 bot_token = %service.bot_token_masked(),
421 timeout_secs = service.timeout_secs(),
422 "Telegram human-in-the-loop service active"
423 );
424 Some(service)
425 }
426 Err(e) => {
427 warn!(error = %e, "Failed to create Telegram service");
428 None
429 }
430 }
431 }
432
433 pub fn loop_context(&self) -> Option<&LoopContext> {
435 self.loop_context.as_ref()
436 }
437
438 fn tasks_path(&self) -> PathBuf {
440 self.loop_context
441 .as_ref()
442 .map(|ctx| ctx.tasks_path())
443 .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
444 }
445
446 fn scratchpad_path(&self) -> PathBuf {
448 self.loop_context
449 .as_ref()
450 .map(|ctx| ctx.scratchpad_path())
451 .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
452 }
453
454 pub fn state(&self) -> &LoopState {
456 &self.state
457 }
458
459 pub fn config(&self) -> &RalphConfig {
461 &self.config
462 }
463
464 pub fn registry(&self) -> &HatRegistry {
466 &self.registry
467 }
468
469 pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
474 self.registry
475 .get_config(hat_id)
476 .and_then(|config| config.backend.as_ref())
477 }
478
479 pub fn add_observer<F>(&mut self, observer: F)
484 where
485 F: Fn(&Event) + Send + 'static,
486 {
487 self.bus.add_observer(observer);
488 }
489
490 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
494 pub fn set_observer<F>(&mut self, observer: F)
495 where
496 F: Fn(&Event) + Send + 'static,
497 {
498 #[allow(deprecated)]
499 self.bus.set_observer(observer);
500 }
501
502 pub fn check_termination(&self) -> Option<TerminationReason> {
504 let cfg = &self.config.event_loop;
505
506 if self.state.iteration >= cfg.max_iterations {
507 return Some(TerminationReason::MaxIterations);
508 }
509
510 if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
511 return Some(TerminationReason::MaxRuntime);
512 }
513
514 if let Some(max_cost) = cfg.max_cost_usd
515 && self.state.cumulative_cost >= max_cost
516 {
517 return Some(TerminationReason::MaxCost);
518 }
519
520 if self.state.consecutive_failures >= cfg.max_consecutive_failures {
521 return Some(TerminationReason::ConsecutiveFailures);
522 }
523
524 if self.state.abandoned_task_redispatches >= 3 {
526 return Some(TerminationReason::LoopThrashing);
527 }
528
529 if self.state.consecutive_malformed_events >= 3 {
531 return Some(TerminationReason::ValidationFailure);
532 }
533
534 let restart_path =
536 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
537 if restart_path.exists() {
538 return Some(TerminationReason::RestartRequested);
539 }
540
541 None
542 }
543
544 pub fn initialize(&mut self, prompt_content: &str) {
546 let topic = self
548 .config
549 .event_loop
550 .starting_event
551 .clone()
552 .unwrap_or_else(|| "task.start".to_string());
553 self.initialize_with_topic(&topic, prompt_content);
554 }
555
556 pub fn initialize_resume(&mut self, prompt_content: &str) {
561 self.initialize_with_topic("task.resume", prompt_content);
563 }
564
565 fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
567 self.ralph.set_objective(prompt_content.to_string());
571
572 let start_event = Event::new(topic, prompt_content);
573 self.bus.publish(start_event);
574 debug!(topic = topic, "Published {} event", topic);
575 }
576
577 pub fn next_hat(&self) -> Option<&HatId> {
586 let next = self.bus.next_hat_with_pending();
587
588 next.as_ref()?;
590
591 if self.registry.is_empty() {
594 next
596 } else {
597 self.bus.hat_ids().find(|id| id.as_str() == "ralph")
600 }
601 }
602
603 pub fn has_pending_events(&self) -> bool {
608 self.bus.next_hat_with_pending().is_some()
609 }
610
611 pub fn has_pending_human_events(&self) -> bool {
616 self.bus.hat_ids().any(|hat_id| {
617 self.bus
618 .peek_pending(hat_id)
619 .map(|events| {
620 events.iter().any(|e| {
621 let topic = e.topic.as_str();
622 topic == "human.response" || topic == "human.guidance"
623 })
624 })
625 .unwrap_or(false)
626 })
627 }
628
629 pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
633 self.registry
634 .get(hat_id)
635 .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
636 .unwrap_or_default()
637 }
638
639 pub fn inject_fallback_event(&mut self) -> bool {
646 let fallback_event = Event::new(
647 "task.resume",
648 "RECOVERY: Previous iteration did not publish an event. \
649 Review the scratchpad and either dispatch the next task or complete the loop.",
650 );
651
652 let fallback_event = match &self.state.last_hat {
655 Some(hat_id) if hat_id.as_str() != "ralph" => {
656 debug!(
657 hat = %hat_id.as_str(),
658 "Injecting fallback event to recover - targeting last hat with task.resume"
659 );
660 fallback_event.with_target(hat_id.clone())
661 }
662 _ => {
663 debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
664 fallback_event
665 }
666 };
667
668 self.bus.publish(fallback_event);
669 true
670 }
671
672 pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
686 if hat_id.as_str() == "ralph" {
689 if self.registry.is_empty() {
690 let events = self.bus.take_pending(&hat_id.clone());
692
693 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
695 .into_iter()
696 .partition(|e| e.topic.as_str() == "human.guidance");
697
698 let events_context = regular_events
699 .iter()
700 .map(|e| Self::format_event(e))
701 .collect::<Vec<_>>()
702 .join("\n");
703
704 self.update_robot_guidance(guidance_events);
706 self.apply_robot_guidance();
707
708 let base_prompt = self.ralph.build_prompt(&events_context, &[]);
710 self.ralph.clear_robot_guidance();
711 let with_skills = self.prepend_auto_inject_skills(base_prompt);
712 let with_scratchpad = self.prepend_scratchpad(with_skills);
713 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
714
715 debug!("build_prompt: routing to HatlessRalph (solo mode)");
716 return Some(final_prompt);
717 } else {
718 let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
720 all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
722
723 let mut all_events = Vec::new();
724 let mut system_events = Vec::new();
725
726 for id in &all_hat_ids {
727 let pending = self.bus.take_pending(id);
728 if pending.is_empty() {
729 continue;
730 }
731
732 let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
733 if drop_pending {
734 if let Some(exhausted_event) = exhausted_event {
736 all_events.push(exhausted_event.clone());
737 system_events.push(exhausted_event);
738 }
739 continue;
740 }
741
742 all_events.extend(pending);
743 }
744
745 for event in system_events {
748 self.bus.publish(event);
749 }
750
751 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
753 .into_iter()
754 .partition(|e| e.topic.as_str() == "human.guidance");
755
756 self.update_robot_guidance(guidance_events);
759 self.apply_robot_guidance();
760
761 let active_hat_ids = self.determine_active_hat_ids(®ular_events);
763 self.record_hat_activations(&active_hat_ids);
764 let active_hats = self.determine_active_hats(®ular_events);
765
766 let events_context = regular_events
768 .iter()
769 .map(|e| Self::format_event(e))
770 .collect::<Vec<_>>()
771 .join("\n");
772
773 let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
775
776 debug!(
778 "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
779 active_hats
780 .iter()
781 .map(|h| h.id.as_str())
782 .collect::<Vec<_>>()
783 );
784
785 self.ralph.clear_robot_guidance();
787 let with_skills = self.prepend_auto_inject_skills(base_prompt);
788 let with_scratchpad = self.prepend_scratchpad(with_skills);
789 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
790
791 return Some(final_prompt);
792 }
793 }
794
795 let events = self.bus.take_pending(&hat_id.clone());
799 let events_context = events
800 .iter()
801 .map(|e| Self::format_event(e))
802 .collect::<Vec<_>>()
803 .join("\n");
804
805 let hat = self.registry.get(hat_id)?;
806
807 debug!(
809 "build_prompt: hat_id='{}', instructions.is_empty()={}",
810 hat_id.as_str(),
811 hat.instructions.is_empty()
812 );
813
814 debug!(
816 "build_prompt: routing to build_custom_hat() for '{}'",
817 hat_id.as_str()
818 );
819 Some(
820 self.instruction_builder
821 .build_custom_hat(hat, &events_context),
822 )
823 }
824
825 fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
831 if guidance_events.is_empty() {
832 return;
833 }
834
835 self.persist_guidance_to_scratchpad(&guidance_events);
837
838 self.robot_guidance
839 .extend(guidance_events.into_iter().map(|e| e.payload));
840 }
841
842 fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
847 use std::io::Write;
848
849 let scratchpad_path = self.scratchpad_path();
850 let resolved_path = if scratchpad_path.is_relative() {
851 self.config.core.workspace_root.join(&scratchpad_path)
852 } else {
853 scratchpad_path
854 };
855
856 if let Some(parent) = resolved_path.parent()
858 && !parent.exists()
859 && let Err(e) = std::fs::create_dir_all(parent)
860 {
861 warn!("Failed to create scratchpad directory: {}", e);
862 return;
863 }
864
865 let mut file = match std::fs::OpenOptions::new()
866 .create(true)
867 .append(true)
868 .open(&resolved_path)
869 {
870 Ok(f) => f,
871 Err(e) => {
872 warn!("Failed to open scratchpad for guidance persistence: {}", e);
873 return;
874 }
875 };
876
877 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
878 for event in guidance_events {
879 let entry = format!(
880 "\n### HUMAN GUIDANCE ({})\n\n{}\n",
881 timestamp, event.payload
882 );
883 if let Err(e) = file.write_all(entry.as_bytes()) {
884 warn!("Failed to write guidance to scratchpad: {}", e);
885 }
886 }
887
888 info!(
889 count = guidance_events.len(),
890 "Persisted human guidance to scratchpad"
891 );
892 }
893
894 fn apply_robot_guidance(&mut self) {
896 if self.robot_guidance.is_empty() {
897 return;
898 }
899
900 self.ralph.set_robot_guidance(self.robot_guidance.clone());
901 }
902
903 fn prepend_auto_inject_skills(&self, prompt: String) -> String {
913 let mut prefix = String::new();
914
915 self.inject_memories_and_tools_skill(&mut prefix);
917
918 self.inject_robot_skill(&mut prefix);
920
921 self.inject_custom_auto_skills(&mut prefix);
923
924 if prefix.is_empty() {
925 return prompt;
926 }
927
928 prefix.push_str("\n\n");
929 prefix.push_str(&prompt);
930 prefix
931 }
932
933 fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
941 let memories_config = &self.config.memories;
942
943 if memories_config.enabled && memories_config.inject == InjectMode::Auto {
945 info!(
946 "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
947 memories_config.enabled, memories_config.inject, self.config.core.workspace_root
948 );
949
950 let workspace_root = &self.config.core.workspace_root;
951 let store = MarkdownMemoryStore::with_default_path(workspace_root);
952 let memories_path = workspace_root.join(".ralph/agent/memories.md");
953
954 info!(
955 "Looking for memories at: {:?} (exists: {})",
956 memories_path,
957 memories_path.exists()
958 );
959
960 let memories = match store.load() {
961 Ok(memories) => {
962 info!("Successfully loaded {} memories from store", memories.len());
963 memories
964 }
965 Err(e) => {
966 info!(
967 "Failed to load memories for injection: {} (path: {:?})",
968 e, memories_path
969 );
970 Vec::new()
971 }
972 };
973
974 if memories.is_empty() {
975 info!("Memory store is empty - no memories to inject");
976 } else {
977 let mut memories_content = format_memories_as_markdown(&memories);
978
979 if memories_config.budget > 0 {
980 let original_len = memories_content.len();
981 memories_content =
982 truncate_to_budget(&memories_content, memories_config.budget);
983 debug!(
984 "Applied budget: {} chars -> {} chars (budget: {})",
985 original_len,
986 memories_content.len(),
987 memories_config.budget
988 );
989 }
990
991 info!(
992 "Injecting {} memories ({} chars) into prompt",
993 memories.len(),
994 memories_content.len()
995 );
996
997 prefix.push_str(&memories_content);
998 }
999 }
1000
1001 if memories_config.enabled || self.config.tasks.enabled {
1003 if let Some(skill) = self.skill_registry.get("ralph-tools") {
1004 if !prefix.is_empty() {
1005 prefix.push_str("\n\n");
1006 }
1007 prefix.push_str(&format!(
1008 "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1009 skill.content.trim()
1010 ));
1011 debug!("Injected ralph-tools skill from registry");
1012 } else {
1013 debug!("ralph-tools skill not found in registry - skill content not injected");
1014 }
1015 }
1016 }
1017
1018 fn inject_robot_skill(&self, prefix: &mut String) {
1023 if !self.config.robot.enabled {
1024 return;
1025 }
1026
1027 if let Some(skill) = self.skill_registry.get("robot-interaction") {
1028 if !prefix.is_empty() {
1029 prefix.push_str("\n\n");
1030 }
1031 prefix.push_str(&format!(
1032 "<robot-skill>\n{}\n</robot-skill>",
1033 skill.content.trim()
1034 ));
1035 debug!("Injected robot interaction skill from registry");
1036 }
1037 }
1038
1039 fn inject_custom_auto_skills(&self, prefix: &mut String) {
1041 for skill in self.skill_registry.auto_inject_skills(None) {
1042 if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1044 continue;
1045 }
1046
1047 if !prefix.is_empty() {
1048 prefix.push_str("\n\n");
1049 }
1050 prefix.push_str(&format!(
1051 "<{name}-skill>\n{content}\n</{name}-skill>",
1052 name = skill.name,
1053 content = skill.content.trim()
1054 ));
1055 debug!("Injected auto-inject skill: {}", skill.name);
1056 }
1057 }
1058
1059 fn prepend_scratchpad(&self, prompt: String) -> String {
1065 let scratchpad_path = self.scratchpad_path();
1066
1067 let resolved_path = if scratchpad_path.is_relative() {
1068 self.config.core.workspace_root.join(&scratchpad_path)
1069 } else {
1070 scratchpad_path
1071 };
1072
1073 if !resolved_path.exists() {
1074 debug!(
1075 "Scratchpad not found at {:?}, skipping injection",
1076 resolved_path
1077 );
1078 return prompt;
1079 }
1080
1081 let content = match std::fs::read_to_string(&resolved_path) {
1082 Ok(c) => c,
1083 Err(e) => {
1084 info!("Failed to read scratchpad for injection: {}", e);
1085 return prompt;
1086 }
1087 };
1088
1089 if content.trim().is_empty() {
1090 debug!("Scratchpad is empty, skipping injection");
1091 return prompt;
1092 }
1093
1094 let char_budget = 4000 * 4;
1096 let content = if content.len() > char_budget {
1097 let start = content.len() - char_budget;
1099 let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1100 let discarded = &content[..line_start];
1101
1102 let headings: Vec<&str> = discarded
1104 .lines()
1105 .filter(|line| line.starts_with('#'))
1106 .collect();
1107 let summary = if headings.is_empty() {
1108 format!(
1109 "<!-- earlier content truncated ({} chars omitted) -->",
1110 line_start
1111 )
1112 } else {
1113 format!(
1114 "<!-- earlier content truncated ({} chars omitted) -->\n\
1115 <!-- discarded sections: {} -->",
1116 line_start,
1117 headings.join(" | ")
1118 )
1119 };
1120
1121 format!("{}\n\n{}", summary, &content[line_start..])
1122 } else {
1123 content
1124 };
1125
1126 info!("Injecting scratchpad ({} chars) into prompt", content.len());
1127
1128 let mut final_prompt = format!(
1129 "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1130 self.config.core.scratchpad, content
1131 );
1132 final_prompt.push_str(&prompt);
1133 final_prompt
1134 }
1135
1136 fn prepend_ready_tasks(&self, prompt: String) -> String {
1142 if !self.config.tasks.enabled {
1143 return prompt;
1144 }
1145
1146 use crate::task::TaskStatus;
1147 use crate::task_store::TaskStore;
1148
1149 let tasks_path = self.tasks_path();
1150 let resolved_path = if tasks_path.is_relative() {
1151 self.config.core.workspace_root.join(&tasks_path)
1152 } else {
1153 tasks_path
1154 };
1155
1156 if !resolved_path.exists() {
1157 return prompt;
1158 }
1159
1160 let store = match TaskStore::load(&resolved_path) {
1161 Ok(s) => s,
1162 Err(e) => {
1163 info!("Failed to load task store for injection: {}", e);
1164 return prompt;
1165 }
1166 };
1167
1168 let ready = store.ready();
1169 let open = store.open();
1170 let closed_count = store.all().len() - open.len();
1171
1172 if open.is_empty() && closed_count == 0 {
1173 return prompt;
1174 }
1175
1176 let mut section = String::from("<ready-tasks>\n");
1177 if ready.is_empty() && open.is_empty() {
1178 section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1179 } else {
1180 section.push_str(&format!(
1181 "## Tasks: {} ready, {} open, {} closed\n\n",
1182 ready.len(),
1183 open.len(),
1184 closed_count
1185 ));
1186 for task in &ready {
1187 let status_icon = match task.status {
1188 TaskStatus::Open => "[ ]",
1189 TaskStatus::InProgress => "[~]",
1190 _ => "[?]",
1191 };
1192 section.push_str(&format!(
1193 "- {} [P{}] {} ({})\n",
1194 status_icon, task.priority, task.title, task.id
1195 ));
1196 }
1197 let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1199 let blocked: Vec<_> = open
1200 .iter()
1201 .filter(|t| !ready_ids.contains(&t.id.as_str()))
1202 .collect();
1203 if !blocked.is_empty() {
1204 section.push_str("\nBlocked:\n");
1205 for task in blocked {
1206 section.push_str(&format!(
1207 "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1208 task.priority,
1209 task.title,
1210 task.id,
1211 task.blocked_by.join(", ")
1212 ));
1213 }
1214 }
1215 }
1216 section.push_str("</ready-tasks>\n\n");
1217
1218 info!(
1219 "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1220 ready.len(),
1221 open.len(),
1222 closed_count
1223 );
1224
1225 let mut final_prompt = section;
1226 final_prompt.push_str(&prompt);
1227 final_prompt
1228 }
1229
1230 pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1232 self.ralph.build_prompt(prompt_content, &[])
1233 }
1234
1235 fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1238 let mut active_hats = Vec::new();
1239 for id in self.determine_active_hat_ids(events) {
1240 if let Some(hat) = self.registry.get(&id) {
1241 active_hats.push(hat);
1242 }
1243 }
1244 active_hats
1245 }
1246
1247 fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1248 let mut active_hat_ids = Vec::new();
1249 for event in events {
1250 if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1251 if !active_hat_ids.iter().any(|id| id == &hat.id) {
1253 active_hat_ids.push(hat.id.clone());
1254 }
1255 }
1256 }
1257 active_hat_ids
1258 }
1259
1260 fn format_event(event: &Event) -> String {
1265 let topic = &event.topic;
1266 let payload = &event.payload;
1267
1268 if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1269 format!(
1270 "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1271 topic, payload
1272 )
1273 } else {
1274 format!("Event: {} - {}", topic, payload)
1275 }
1276 }
1277
1278 fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1279 let Some(config) = self.registry.get_config(hat_id) else {
1280 return (false, None);
1281 };
1282 let Some(max) = config.max_activations else {
1283 return (false, None);
1284 };
1285
1286 let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1287 if count < max {
1288 return (false, None);
1289 }
1290
1291 let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1293
1294 if !should_emit {
1295 return (true, None);
1297 }
1298
1299 let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1300 dropped_topics.sort();
1301
1302 let payload = format!(
1303 "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n - {topics}",
1304 hat = hat_id.as_str(),
1305 max = max,
1306 count = count,
1307 topics = dropped_topics.join("\n - ")
1308 );
1309
1310 warn!(
1311 hat = %hat_id.as_str(),
1312 max_activations = max,
1313 activations = count,
1314 "Hat exhausted (max_activations reached)"
1315 );
1316
1317 (
1318 true,
1319 Some(Event::new(
1320 format!("{}.exhausted", hat_id.as_str()),
1321 payload,
1322 )),
1323 )
1324 }
1325
1326 fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1327 for hat_id in active_hat_ids {
1328 *self
1329 .state
1330 .hat_activation_counts
1331 .entry(hat_id.clone())
1332 .or_insert(0) += 1;
1333 }
1334 }
1335
1336 pub fn get_active_hat_id(&self) -> HatId {
1339 for hat_id in self.bus.hat_ids() {
1341 let Some(events) = self.bus.peek_pending(hat_id) else {
1342 continue;
1343 };
1344 let Some(event) = events.first() else {
1345 continue;
1346 };
1347 if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1348 return active_hat.id.clone();
1349 }
1350 }
1351 HatId::new("ralph")
1352 }
1353
1354 pub fn record_event_count(&mut self) -> usize {
1359 self.event_reader
1360 .read_new_events()
1361 .map(|r| r.events.len())
1362 .unwrap_or(0)
1363 }
1364
1365 pub fn check_default_publishes(&mut self, hat_id: &HatId, _events_before: usize) {
1371 let events_after = self
1372 .event_reader
1373 .read_new_events()
1374 .map(|r| r.events.len())
1375 .unwrap_or(0);
1376
1377 if events_after == 0
1378 && let Some(config) = self.registry.get_config(hat_id)
1379 && let Some(default_topic) = &config.default_publishes
1380 {
1381 let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1383
1384 debug!(
1385 hat = %hat_id.as_str(),
1386 topic = %default_topic,
1387 "No events written by hat, injecting default_publishes event"
1388 );
1389
1390 self.bus.publish(default_event);
1391 }
1392 }
1393
1394 pub fn bus(&mut self) -> &mut EventBus {
1399 &mut self.bus
1400 }
1401
1402 pub fn process_output(
1406 &mut self,
1407 hat_id: &HatId,
1408 output: &str,
1409 success: bool,
1410 ) -> Option<TerminationReason> {
1411 self.state.iteration += 1;
1412 self.state.last_hat = Some(hat_id.clone());
1413
1414 if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1416 && let Some(ref telegram_service) = self.telegram_service
1417 {
1418 let elapsed = self.state.elapsed();
1419 let interval = std::time::Duration::from_secs(interval_secs);
1420 let last = self
1421 .state
1422 .last_checkin_at
1423 .map(|t| t.elapsed())
1424 .unwrap_or(elapsed);
1425
1426 if last >= interval {
1427 let context = self.build_checkin_context(hat_id);
1428 match telegram_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1429 Ok(_) => {
1430 self.state.last_checkin_at = Some(std::time::Instant::now());
1431 debug!(iteration = self.state.iteration, "Sent Telegram check-in");
1432 }
1433 Err(e) => {
1434 warn!(error = %e, "Failed to send Telegram check-in");
1435 }
1436 }
1437 }
1438 }
1439
1440 self.diagnostics.log_orchestration(
1442 self.state.iteration,
1443 "loop",
1444 crate::diagnostics::OrchestrationEvent::IterationStarted,
1445 );
1446
1447 self.diagnostics.log_orchestration(
1449 self.state.iteration,
1450 "loop",
1451 crate::diagnostics::OrchestrationEvent::HatSelected {
1452 hat: hat_id.to_string(),
1453 reason: "process_output".to_string(),
1454 },
1455 );
1456
1457 if success {
1459 self.state.consecutive_failures = 0;
1460 } else {
1461 self.state.consecutive_failures += 1;
1462 }
1463
1464 if hat_id.as_str() == "ralph"
1468 && EventParser::contains_promise(output, &self.config.event_loop.completion_promise)
1469 {
1470 if self.config.memories.enabled {
1472 if let Ok(false) = self.verify_tasks_complete() {
1473 let open_tasks = self.get_open_task_list();
1474 warn!(
1475 open_tasks = ?open_tasks,
1476 "LOOP_COMPLETE with {} open task(s) - trusting agent decision",
1477 open_tasks.len()
1478 );
1479 }
1480 } else if let Ok(false) = self.verify_scratchpad_complete() {
1481 warn!("LOOP_COMPLETE with pending scratchpad tasks - trusting agent decision");
1482 }
1483
1484 info!("LOOP_COMPLETE detected - terminating");
1486
1487 self.diagnostics.log_orchestration(
1489 self.state.iteration,
1490 "loop",
1491 crate::diagnostics::OrchestrationEvent::LoopTerminated {
1492 reason: "completion_promise".to_string(),
1493 },
1494 );
1495
1496 return Some(TerminationReason::CompletionPromise);
1497 }
1498
1499 self.check_termination()
1505 }
1506
1507 fn extract_task_id(payload: &str) -> String {
1510 payload
1511 .lines()
1512 .next()
1513 .unwrap_or("unknown")
1514 .trim()
1515 .to_string()
1516 }
1517
1518 pub fn add_cost(&mut self, cost: f64) {
1520 self.state.cumulative_cost += cost;
1521 }
1522
1523 fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1530 let scratchpad_path = self.scratchpad_path();
1531
1532 if !scratchpad_path.exists() {
1533 return Err(std::io::Error::new(
1534 std::io::ErrorKind::NotFound,
1535 "Scratchpad does not exist",
1536 ));
1537 }
1538
1539 let content = std::fs::read_to_string(scratchpad_path)?;
1540
1541 let has_pending = content
1542 .lines()
1543 .any(|line| line.trim_start().starts_with("- [ ]"));
1544
1545 Ok(!has_pending)
1546 }
1547
1548 fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1549 use crate::task_store::TaskStore;
1550
1551 let tasks_path = self.tasks_path();
1552
1553 if !tasks_path.exists() {
1555 return Ok(true);
1556 }
1557
1558 let store = TaskStore::load(&tasks_path)?;
1559 Ok(!store.has_pending_tasks())
1560 }
1561
1562 fn build_checkin_context(&self, hat_id: &HatId) -> ralph_telegram::CheckinContext {
1564 let (open_tasks, closed_tasks) = self.count_tasks();
1565 ralph_telegram::CheckinContext {
1566 current_hat: Some(hat_id.as_str().to_string()),
1567 open_tasks,
1568 closed_tasks,
1569 cumulative_cost: self.state.cumulative_cost,
1570 }
1571 }
1572
1573 fn count_tasks(&self) -> (usize, usize) {
1578 use crate::task::TaskStatus;
1579 use crate::task_store::TaskStore;
1580
1581 let tasks_path = self.tasks_path();
1582 if !tasks_path.exists() {
1583 return (0, 0);
1584 }
1585
1586 match TaskStore::load(&tasks_path) {
1587 Ok(store) => {
1588 let total = store.all().len();
1589 let open = store.open().len();
1590 let closed = total - open;
1591 debug_assert_eq!(
1593 closed,
1594 store
1595 .all()
1596 .iter()
1597 .filter(|t| t.status == TaskStatus::Closed)
1598 .count()
1599 );
1600 (open, closed)
1601 }
1602 Err(_) => (0, 0),
1603 }
1604 }
1605
1606 fn get_open_task_list(&self) -> Vec<String> {
1608 use crate::task_store::TaskStore;
1609
1610 let tasks_path = self.tasks_path();
1611 if let Ok(store) = TaskStore::load(&tasks_path) {
1612 return store
1613 .open()
1614 .iter()
1615 .map(|t| format!("{}: {}", t.id, t.title))
1616 .collect();
1617 }
1618 vec![]
1619 }
1620
1621 pub fn process_events_from_jsonl(&mut self) -> std::io::Result<bool> {
1630 let result = self.event_reader.read_new_events()?;
1631
1632 for malformed in &result.malformed {
1634 let payload = format!(
1635 "Line {}: {}\nContent: {}",
1636 malformed.line_number, malformed.error, &malformed.content
1637 );
1638 let event = Event::new("event.malformed", &payload);
1639 self.bus.publish(event);
1640 self.state.consecutive_malformed_events += 1;
1641 warn!(
1642 line = malformed.line_number,
1643 consecutive = self.state.consecutive_malformed_events,
1644 "Malformed event line detected"
1645 );
1646 }
1647
1648 if !result.events.is_empty() {
1650 self.state.consecutive_malformed_events = 0;
1651 }
1652
1653 if result.events.is_empty() && result.malformed.is_empty() {
1654 return Ok(false);
1655 }
1656
1657 let mut has_orphans = false;
1658
1659 let mut validated_events = Vec::new();
1661 for event in result.events {
1662 let payload = event.payload.clone().unwrap_or_default();
1663
1664 if event.topic == "build.done" {
1665 if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1667 if evidence.all_passed() {
1668 validated_events.push(Event::new(event.topic.as_str(), &payload));
1669 } else {
1670 warn!(
1672 tests = evidence.tests_passed,
1673 lint = evidence.lint_passed,
1674 typecheck = evidence.typecheck_passed,
1675 "build.done rejected: backpressure checks failed"
1676 );
1677
1678 self.diagnostics.log_orchestration(
1679 self.state.iteration,
1680 "jsonl",
1681 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1682 reason: format!(
1683 "backpressure checks failed: tests={}, lint={}, typecheck={}",
1684 evidence.tests_passed,
1685 evidence.lint_passed,
1686 evidence.typecheck_passed
1687 ),
1688 },
1689 );
1690
1691 validated_events.push(Event::new(
1692 "build.blocked",
1693 "Backpressure checks failed. Fix tests/lint/typecheck before emitting build.done.",
1694 ));
1695 }
1696 } else {
1697 warn!("build.done rejected: missing backpressure evidence");
1699
1700 self.diagnostics.log_orchestration(
1701 self.state.iteration,
1702 "jsonl",
1703 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1704 reason: "missing backpressure evidence".to_string(),
1705 },
1706 );
1707
1708 validated_events.push(Event::new(
1709 "build.blocked",
1710 "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass' in build.done payload.",
1711 ));
1712 }
1713 } else if event.topic == "review.done" {
1714 if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
1716 if evidence.is_verified() {
1717 validated_events.push(Event::new(event.topic.as_str(), &payload));
1718 } else {
1719 warn!(
1721 tests = evidence.tests_passed,
1722 build = evidence.build_passed,
1723 "review.done rejected: verification checks failed"
1724 );
1725
1726 self.diagnostics.log_orchestration(
1727 self.state.iteration,
1728 "jsonl",
1729 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1730 reason: format!(
1731 "review verification failed: tests={}, build={}",
1732 evidence.tests_passed, evidence.build_passed
1733 ),
1734 },
1735 );
1736
1737 validated_events.push(Event::new(
1738 "review.blocked",
1739 "Review verification failed. Run tests and build before emitting review.done.",
1740 ));
1741 }
1742 } else {
1743 warn!("review.done rejected: missing verification evidence");
1745
1746 self.diagnostics.log_orchestration(
1747 self.state.iteration,
1748 "jsonl",
1749 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1750 reason: "missing review verification evidence".to_string(),
1751 },
1752 );
1753
1754 validated_events.push(Event::new(
1755 "review.blocked",
1756 "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
1757 ));
1758 }
1759 } else {
1760 validated_events.push(Event::new(event.topic.as_str(), &payload));
1762 }
1763 }
1764
1765 let blocked_events: Vec<_> = validated_events
1767 .iter()
1768 .filter(|e| e.topic == "build.blocked".into())
1769 .collect();
1770
1771 for blocked_event in &blocked_events {
1772 let task_id = Self::extract_task_id(&blocked_event.payload);
1773
1774 let count = self
1775 .state
1776 .task_block_counts
1777 .entry(task_id.clone())
1778 .or_insert(0);
1779 *count += 1;
1780
1781 debug!(
1782 task_id = %task_id,
1783 block_count = *count,
1784 "Task blocked"
1785 );
1786
1787 if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
1789 warn!(
1790 task_id = %task_id,
1791 "Task abandoned after 3 consecutive blocks"
1792 );
1793
1794 self.state.abandoned_tasks.push(task_id.clone());
1795
1796 self.diagnostics.log_orchestration(
1797 self.state.iteration,
1798 "jsonl",
1799 crate::diagnostics::OrchestrationEvent::TaskAbandoned {
1800 reason: format!(
1801 "3 consecutive build.blocked events for task '{}'",
1802 task_id
1803 ),
1804 },
1805 );
1806
1807 let abandoned_event = Event::new(
1808 "build.task.abandoned",
1809 format!(
1810 "Task '{}' abandoned after 3 consecutive build.blocked events",
1811 task_id
1812 ),
1813 );
1814
1815 self.bus.publish(abandoned_event);
1816 }
1817 }
1818
1819 let has_blocked_event = !blocked_events.is_empty();
1821
1822 if has_blocked_event {
1823 self.state.consecutive_blocked += 1;
1824 } else {
1825 self.state.consecutive_blocked = 0;
1826 self.state.last_blocked_hat = None;
1827 }
1828
1829 let mut response_event = None;
1833 let ask_human_idx = validated_events
1834 .iter()
1835 .position(|e| e.topic == "interact.human".into());
1836
1837 if let Some(idx) = ask_human_idx {
1838 let ask_event = &validated_events[idx];
1839 let payload = ask_event.payload.clone();
1840
1841 if let Some(ref telegram_service) = self.telegram_service {
1842 info!(
1843 payload = %payload,
1844 "interact.human event detected — sending question via Telegram"
1845 );
1846
1847 let send_ok = match telegram_service.send_question(&payload) {
1849 Ok(_message_id) => true,
1850 Err(e) => {
1851 warn!(
1852 error = %e,
1853 "Failed to send interact.human question after retries — treating as timeout"
1854 );
1855 self.diagnostics.log_error(
1857 self.state.iteration,
1858 "telegram",
1859 crate::diagnostics::DiagnosticError::TelegramSendError {
1860 operation: "send_question".to_string(),
1861 error: e.to_string(),
1862 retry_count: ralph_telegram::MAX_SEND_RETRIES,
1863 },
1864 );
1865 false
1866 }
1867 };
1868
1869 if send_ok {
1872 let events_path = self
1873 .loop_context
1874 .as_ref()
1875 .map(|ctx| ctx.events_path())
1876 .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"));
1877
1878 match telegram_service.wait_for_response(&events_path) {
1879 Ok(Some(response)) => {
1880 info!(
1881 response = %response,
1882 "Received human.response — continuing loop"
1883 );
1884 response_event = Some(Event::new("human.response", &response));
1886 }
1887 Ok(None) => {
1888 warn!(
1889 timeout_secs = telegram_service.timeout_secs(),
1890 "Human response timeout — continuing without response"
1891 );
1892 }
1893 Err(e) => {
1894 warn!(
1895 error = %e,
1896 "Error waiting for human response — continuing without response"
1897 );
1898 }
1899 }
1900 }
1901 } else {
1902 debug!(
1903 "interact.human event detected but no Telegram service active — passing through"
1904 );
1905 }
1906 }
1907
1908 for event in validated_events {
1913 self.diagnostics.log_orchestration(
1914 self.state.iteration,
1915 "jsonl",
1916 crate::diagnostics::OrchestrationEvent::EventPublished {
1917 topic: event.topic.to_string(),
1918 },
1919 );
1920
1921 if !self.registry.has_subscriber(event.topic.as_str()) {
1922 has_orphans = true;
1923 }
1924
1925 debug!(
1926 topic = %event.topic,
1927 "Publishing event from JSONL"
1928 );
1929 self.bus.publish(event);
1930 }
1931
1932 if let Some(response) = response_event {
1934 info!(
1935 topic = %response.topic,
1936 "Publishing human.response event from Telegram"
1937 );
1938 self.bus.publish(response);
1939 }
1940
1941 Ok(has_orphans)
1942 }
1943
1944 pub fn check_ralph_completion(&self, output: &str) -> bool {
1948 EventParser::contains_promise(output, &self.config.event_loop.completion_promise)
1949 }
1950
1951 pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
1958 self.stop_telegram_service();
1960
1961 let elapsed = self.state.elapsed();
1962 let duration_str = format_duration(elapsed);
1963
1964 let payload = format!(
1965 "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
1966 reason.as_str(),
1967 termination_status_text(reason),
1968 self.state.iteration,
1969 duration_str,
1970 reason.exit_code()
1971 );
1972
1973 let event = Event::new("loop.terminate", &payload);
1974
1975 self.bus.publish(event.clone());
1977
1978 info!(
1979 reason = %reason.as_str(),
1980 iterations = self.state.iteration,
1981 duration = %duration_str,
1982 "Wrapping up: {}. {} iterations in {}.",
1983 reason.as_str(),
1984 self.state.iteration,
1985 duration_str
1986 );
1987
1988 event
1989 }
1990
1991 pub fn telegram_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
1996 self.telegram_service.as_ref().map(|s| s.shutdown_flag())
1997 }
1998
1999 pub fn telegram_service(&self) -> Option<&TelegramService> {
2001 self.telegram_service.as_ref()
2002 }
2003
2004 pub fn telegram_service_mut(&mut self) -> Option<&mut TelegramService> {
2006 self.telegram_service.as_mut()
2007 }
2008
2009 fn stop_telegram_service(&mut self) {
2013 if let Some(service) = self.telegram_service.take() {
2014 service.stop();
2015 }
2016 }
2017
2018 pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2026 events
2027 .iter()
2028 .find(|e| e.topic.as_str() == "user.prompt")
2029 .map(|e| UserPrompt {
2030 id: Self::extract_prompt_id(&e.payload),
2031 text: e.payload.clone(),
2032 })
2033 }
2034
2035 fn extract_prompt_id(payload: &str) -> String {
2040 if let Some(start) = payload.find("id=\"")
2042 && let Some(end) = payload[start + 4..].find('"')
2043 {
2044 return payload[start + 4..start + 4 + end].to_string();
2045 }
2046
2047 format!("q{}", Self::generate_prompt_id())
2049 }
2050
2051 fn generate_prompt_id() -> String {
2054 use std::time::{SystemTime, UNIX_EPOCH};
2055 let nanos = SystemTime::now()
2056 .duration_since(UNIX_EPOCH)
2057 .unwrap()
2058 .as_nanos();
2059 format!("{:x}", nanos % 0xFFFF_FFFF)
2060 }
2061}
2062
2063#[derive(Debug, Clone)]
2067pub struct UserPrompt {
2068 pub id: String,
2070 pub text: String,
2072}
2073
2074#[derive(Debug, thiserror::Error)]
2076pub enum UserPromptError {
2077 #[error("Timeout waiting for user response")]
2078 Timeout,
2079
2080 #[error("Interrupted while waiting for user response")]
2081 Interrupted,
2082
2083 #[error("I/O error: {0}")]
2084 Io(#[from] std::io::Error),
2085}
2086
2087#[allow(dead_code)]
2103pub async fn wait_for_user_response_async(
2104 conversation_path: &std::path::Path,
2105 prompt_id: &str,
2106 timeout_secs: u64,
2107 mut interrupt_rx: Option<&mut tokio::sync::watch::Receiver<bool>>,
2108) -> Result<String, UserPromptError> {
2109 use tokio::time::{Duration, sleep, timeout};
2110
2111 let poll_interval = Duration::from_millis(100);
2112
2113 let result = timeout(Duration::from_secs(timeout_secs), async {
2114 loop {
2115 if let Some(rx) = &mut interrupt_rx
2117 && *rx.borrow()
2118 {
2119 return Err(UserPromptError::Interrupted);
2120 }
2121
2122 if let Some(response) = find_response_in_file(conversation_path, prompt_id)? {
2124 return Ok(response);
2125 }
2126
2127 sleep(poll_interval).await;
2129 }
2130 })
2131 .await;
2132
2133 match result {
2134 Ok(r) => r,
2135 Err(_) => Err(UserPromptError::Timeout),
2136 }
2137}
2138
2139#[allow(dead_code)]
2154pub fn wait_for_user_response(
2155 conversation_path: &std::path::Path,
2156 prompt_id: &str,
2157 timeout_secs: u64,
2158) -> Result<String, UserPromptError> {
2159 use std::thread;
2160 use std::time::{Duration, Instant};
2161
2162 let deadline = Instant::now() + Duration::from_secs(timeout_secs);
2163 let poll_interval = Duration::from_millis(100);
2164
2165 loop {
2166 if Instant::now() >= deadline {
2168 return Err(UserPromptError::Timeout);
2169 }
2170
2171 if let Some(response) = find_response_in_file(conversation_path, prompt_id)? {
2173 return Ok(response);
2174 }
2175
2176 thread::sleep(poll_interval);
2178 }
2179}
2180
2181#[allow(dead_code)]
2183fn find_response_in_file(
2184 conversation_path: &std::path::Path,
2185 prompt_id: &str,
2186) -> Result<Option<String>, UserPromptError> {
2187 if !conversation_path.exists() {
2188 return Ok(None);
2189 }
2190
2191 let content = std::fs::read_to_string(conversation_path)?;
2192
2193 for line in content.lines() {
2194 if let Ok(entry) = serde_json::from_str::<crate::planning_session::ConversationEntry>(line)
2195 && entry.entry_type == crate::planning_session::ConversationType::UserResponse
2196 && entry.id == prompt_id
2197 {
2198 return Ok(Some(entry.text));
2199 }
2200 }
2201
2202 Ok(None)
2203}
2204
2205fn format_duration(d: Duration) -> String {
2207 let total_secs = d.as_secs();
2208 let hours = total_secs / 3600;
2209 let minutes = (total_secs % 3600) / 60;
2210 let seconds = total_secs % 60;
2211
2212 if hours > 0 {
2213 format!("{}h {}m {}s", hours, minutes, seconds)
2214 } else if minutes > 0 {
2215 format!("{}m {}s", minutes, seconds)
2216 } else {
2217 format!("{}s", seconds)
2218 }
2219}
2220
2221fn termination_status_text(reason: &TerminationReason) -> &'static str {
2223 match reason {
2224 TerminationReason::CompletionPromise => "All tasks completed successfully.",
2225 TerminationReason::MaxIterations => "Stopped at iteration limit.",
2226 TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2227 TerminationReason::MaxCost => "Stopped at cost limit.",
2228 TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2229 TerminationReason::LoopThrashing => {
2230 "Loop thrashing detected - same hat repeatedly blocked."
2231 }
2232 TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2233 TerminationReason::Stopped => "Manually stopped.",
2234 TerminationReason::Interrupted => "Interrupted by signal.",
2235 TerminationReason::ChaosModeComplete => "Chaos mode exploration complete.",
2236 TerminationReason::ChaosModeMaxIterations => "Chaos mode stopped at iteration limit.",
2237 TerminationReason::RestartRequested => "Restarting by human request.",
2238 }
2239}