1mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::EventParser;
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use ralph_proto::{Event, EventBus, Hat, HatId};
21use ralph_telegram::TelegramService;
22use std::path::PathBuf;
23use std::sync::Arc;
24use std::sync::atomic::AtomicBool;
25use std::time::Duration;
26use tracing::{debug, info, warn};
27
28#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum TerminationReason {
31 CompletionPromise,
33 MaxIterations,
35 MaxRuntime,
37 MaxCost,
39 ConsecutiveFailures,
41 LoopThrashing,
43 ValidationFailure,
45 Stopped,
47 Interrupted,
49 ChaosModeComplete,
51 ChaosModeMaxIterations,
53 RestartRequested,
55}
56
57impl TerminationReason {
58 pub fn exit_code(&self) -> i32 {
66 match self {
67 TerminationReason::CompletionPromise | TerminationReason::ChaosModeComplete => 0,
68 TerminationReason::ConsecutiveFailures
69 | TerminationReason::LoopThrashing
70 | TerminationReason::ValidationFailure
71 | TerminationReason::Stopped => 1,
72 TerminationReason::MaxIterations
73 | TerminationReason::MaxRuntime
74 | TerminationReason::MaxCost
75 | TerminationReason::ChaosModeMaxIterations => 2,
76 TerminationReason::Interrupted => 130,
77 TerminationReason::RestartRequested => 3,
79 }
80 }
81
82 pub fn as_str(&self) -> &'static str {
87 match self {
88 TerminationReason::CompletionPromise => "completed",
89 TerminationReason::MaxIterations => "max_iterations",
90 TerminationReason::MaxRuntime => "max_runtime",
91 TerminationReason::MaxCost => "max_cost",
92 TerminationReason::ConsecutiveFailures => "consecutive_failures",
93 TerminationReason::LoopThrashing => "loop_thrashing",
94 TerminationReason::ValidationFailure => "validation_failure",
95 TerminationReason::Stopped => "stopped",
96 TerminationReason::Interrupted => "interrupted",
97 TerminationReason::ChaosModeComplete => "chaos_complete",
98 TerminationReason::ChaosModeMaxIterations => "chaos_max_iterations",
99 TerminationReason::RestartRequested => "restart_requested",
100 }
101 }
102
103 pub fn is_success(&self) -> bool {
105 matches!(
106 self,
107 TerminationReason::CompletionPromise | TerminationReason::ChaosModeComplete
108 )
109 }
110
111 pub fn triggers_chaos_mode(&self) -> bool {
115 matches!(self, TerminationReason::CompletionPromise)
116 }
117}
118
119pub struct EventLoop {
121 config: RalphConfig,
122 registry: HatRegistry,
123 bus: EventBus,
124 state: LoopState,
125 instruction_builder: InstructionBuilder,
126 ralph: HatlessRalph,
127 robot_guidance: Vec<String>,
129 pub(crate) event_reader: EventReader,
132 diagnostics: crate::diagnostics::DiagnosticsCollector,
133 loop_context: Option<LoopContext>,
135 skill_registry: SkillRegistry,
137 telegram_service: Option<TelegramService>,
140}
141
142impl EventLoop {
143 pub fn new(config: RalphConfig) -> Self {
145 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
148 .unwrap_or_else(|e| {
149 debug!(
150 "Failed to initialize diagnostics: {}, using disabled collector",
151 e
152 );
153 crate::diagnostics::DiagnosticsCollector::disabled()
154 });
155
156 Self::with_diagnostics(config, diagnostics)
157 }
158
159 pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
165 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
166 .unwrap_or_else(|e| {
167 debug!(
168 "Failed to initialize diagnostics: {}, using disabled collector",
169 e
170 );
171 crate::diagnostics::DiagnosticsCollector::disabled()
172 });
173
174 Self::with_context_and_diagnostics(config, context, diagnostics)
175 }
176
177 pub fn with_context_and_diagnostics(
179 config: RalphConfig,
180 context: LoopContext,
181 diagnostics: crate::diagnostics::DiagnosticsCollector,
182 ) -> Self {
183 let registry = HatRegistry::from_config(&config);
184 let instruction_builder =
185 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
186
187 let mut bus = EventBus::new();
188
189 for hat in registry.all() {
193 bus.register(hat.clone());
194 }
195
196 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
200
201 if registry.is_empty() {
202 debug!("Solo mode: Ralph is the only coordinator");
203 } else {
204 debug!(
205 "Multi-hat mode: {} custom hats + Ralph as fallback",
206 registry.len()
207 );
208 }
209
210 let skill_registry = if config.skills.enabled {
212 SkillRegistry::from_config(
213 &config.skills,
214 context.workspace(),
215 Some(config.cli.backend.as_str()),
216 )
217 .unwrap_or_else(|e| {
218 warn!(
219 "Failed to build skill registry: {}, using empty registry",
220 e
221 );
222 SkillRegistry::new(Some(config.cli.backend.as_str()))
223 })
224 } else {
225 SkillRegistry::new(Some(config.cli.backend.as_str()))
226 };
227
228 let skill_index = if config.skills.enabled {
229 skill_registry.build_index(None)
230 } else {
231 String::new()
232 };
233
234 let ralph = HatlessRalph::new(
236 config.event_loop.completion_promise.clone(),
237 config.core.clone(),
238 ®istry,
239 config.event_loop.starting_event.clone(),
240 )
241 .with_memories_enabled(config.memories.enabled)
242 .with_skill_index(skill_index);
243
244 let events_path = std::fs::read_to_string(context.current_events_marker())
248 .map(|s| {
249 let relative = s.trim();
250 context.workspace().join(relative)
251 })
252 .unwrap_or_else(|_| context.events_path());
253 let event_reader = EventReader::new(&events_path);
254
255 let telegram_service = Self::create_telegram_service(&config, Some(&context));
258
259 Self {
260 config,
261 registry,
262 bus,
263 state: LoopState::new(),
264 instruction_builder,
265 ralph,
266 robot_guidance: Vec::new(),
267 event_reader,
268 diagnostics,
269 loop_context: Some(context),
270 skill_registry,
271 telegram_service,
272 }
273 }
274
275 pub fn with_diagnostics(
277 config: RalphConfig,
278 diagnostics: crate::diagnostics::DiagnosticsCollector,
279 ) -> Self {
280 let registry = HatRegistry::from_config(&config);
281 let instruction_builder =
282 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
283
284 let mut bus = EventBus::new();
285
286 for hat in registry.all() {
290 bus.register(hat.clone());
291 }
292
293 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
297
298 if registry.is_empty() {
299 debug!("Solo mode: Ralph is the only coordinator");
300 } else {
301 debug!(
302 "Multi-hat mode: {} custom hats + Ralph as fallback",
303 registry.len()
304 );
305 }
306
307 let workspace_root = std::path::Path::new(".");
309 let skill_registry = if config.skills.enabled {
310 SkillRegistry::from_config(
311 &config.skills,
312 workspace_root,
313 Some(config.cli.backend.as_str()),
314 )
315 .unwrap_or_else(|e| {
316 warn!(
317 "Failed to build skill registry: {}, using empty registry",
318 e
319 );
320 SkillRegistry::new(Some(config.cli.backend.as_str()))
321 })
322 } else {
323 SkillRegistry::new(Some(config.cli.backend.as_str()))
324 };
325
326 let skill_index = if config.skills.enabled {
327 skill_registry.build_index(None)
328 } else {
329 String::new()
330 };
331
332 let ralph = HatlessRalph::new(
334 config.event_loop.completion_promise.clone(),
335 config.core.clone(),
336 ®istry,
337 config.event_loop.starting_event.clone(),
338 )
339 .with_memories_enabled(config.memories.enabled)
340 .with_skill_index(skill_index);
341
342 let events_path = std::fs::read_to_string(".ralph/current-events")
345 .map(|s| s.trim().to_string())
346 .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
347 let event_reader = EventReader::new(&events_path);
348
349 let telegram_service = Self::create_telegram_service(&config, None);
352
353 Self {
354 config,
355 registry,
356 bus,
357 state: LoopState::new(),
358 instruction_builder,
359 ralph,
360 robot_guidance: Vec::new(),
361 event_reader,
362 diagnostics,
363 loop_context: None,
364 skill_registry,
365 telegram_service,
366 }
367 }
368
369 fn create_telegram_service(
378 config: &RalphConfig,
379 context: Option<&LoopContext>,
380 ) -> Option<TelegramService> {
381 if !config.robot.enabled {
382 return None;
383 }
384
385 if let Some(ctx) = context
388 && !ctx.is_primary()
389 {
390 debug!(
391 workspace = %ctx.workspace().display(),
392 "Skipping Telegram service: not the primary loop"
393 );
394 return None;
395 }
396
397 let workspace_root = context
398 .map(|ctx| ctx.workspace().to_path_buf())
399 .unwrap_or_else(|| config.core.workspace_root.clone());
400
401 let bot_token = config.robot.resolve_bot_token();
402 let timeout_secs = config.robot.timeout_seconds.unwrap_or(300);
403 let loop_id = context
404 .and_then(|ctx| ctx.loop_id().map(String::from))
405 .unwrap_or_else(|| "main".to_string());
406
407 match TelegramService::new(workspace_root, bot_token, timeout_secs, loop_id) {
408 Ok(service) => {
409 if let Err(e) = service.start() {
410 warn!(error = %e, "Failed to start Telegram service");
411 return None;
412 }
413 info!(
414 bot_token = %service.bot_token_masked(),
415 timeout_secs = service.timeout_secs(),
416 "Telegram human-in-the-loop service active"
417 );
418 Some(service)
419 }
420 Err(e) => {
421 warn!(error = %e, "Failed to create Telegram service");
422 None
423 }
424 }
425 }
426
427 pub fn loop_context(&self) -> Option<&LoopContext> {
429 self.loop_context.as_ref()
430 }
431
432 fn tasks_path(&self) -> PathBuf {
434 self.loop_context
435 .as_ref()
436 .map(|ctx| ctx.tasks_path())
437 .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
438 }
439
440 fn scratchpad_path(&self) -> PathBuf {
442 self.loop_context
443 .as_ref()
444 .map(|ctx| ctx.scratchpad_path())
445 .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
446 }
447
448 pub fn state(&self) -> &LoopState {
450 &self.state
451 }
452
453 pub fn config(&self) -> &RalphConfig {
455 &self.config
456 }
457
458 pub fn registry(&self) -> &HatRegistry {
460 &self.registry
461 }
462
463 pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
468 self.registry
469 .get_config(hat_id)
470 .and_then(|config| config.backend.as_ref())
471 }
472
473 pub fn add_observer<F>(&mut self, observer: F)
478 where
479 F: Fn(&Event) + Send + 'static,
480 {
481 self.bus.add_observer(observer);
482 }
483
484 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
488 pub fn set_observer<F>(&mut self, observer: F)
489 where
490 F: Fn(&Event) + Send + 'static,
491 {
492 #[allow(deprecated)]
493 self.bus.set_observer(observer);
494 }
495
496 pub fn check_termination(&self) -> Option<TerminationReason> {
498 let cfg = &self.config.event_loop;
499
500 if self.state.iteration >= cfg.max_iterations {
501 return Some(TerminationReason::MaxIterations);
502 }
503
504 if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
505 return Some(TerminationReason::MaxRuntime);
506 }
507
508 if let Some(max_cost) = cfg.max_cost_usd
509 && self.state.cumulative_cost >= max_cost
510 {
511 return Some(TerminationReason::MaxCost);
512 }
513
514 if self.state.consecutive_failures >= cfg.max_consecutive_failures {
515 return Some(TerminationReason::ConsecutiveFailures);
516 }
517
518 if self.state.abandoned_task_redispatches >= 3 {
520 return Some(TerminationReason::LoopThrashing);
521 }
522
523 if self.state.consecutive_malformed_events >= 3 {
525 return Some(TerminationReason::ValidationFailure);
526 }
527
528 let restart_path =
530 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
531 if restart_path.exists() {
532 return Some(TerminationReason::RestartRequested);
533 }
534
535 None
536 }
537
538 pub fn initialize(&mut self, prompt_content: &str) {
540 let topic = self
542 .config
543 .event_loop
544 .starting_event
545 .clone()
546 .unwrap_or_else(|| "task.start".to_string());
547 self.initialize_with_topic(&topic, prompt_content);
548 }
549
550 pub fn initialize_resume(&mut self, prompt_content: &str) {
555 self.initialize_with_topic("task.resume", prompt_content);
557 }
558
559 fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
561 self.ralph.set_objective(prompt_content.to_string());
565
566 let start_event = Event::new(topic, prompt_content);
567 self.bus.publish(start_event);
568 debug!(topic = topic, "Published {} event", topic);
569 }
570
571 pub fn next_hat(&self) -> Option<&HatId> {
580 let next = self.bus.next_hat_with_pending();
581
582 next.as_ref()?;
584
585 if self.registry.is_empty() {
588 next
590 } else {
591 self.bus.hat_ids().find(|id| id.as_str() == "ralph")
594 }
595 }
596
597 pub fn has_pending_events(&self) -> bool {
602 self.bus.next_hat_with_pending().is_some()
603 }
604
605 pub fn has_pending_human_events(&self) -> bool {
610 self.bus.hat_ids().any(|hat_id| {
611 self.bus
612 .peek_pending(hat_id)
613 .map(|events| {
614 events.iter().any(|e| {
615 let topic = e.topic.as_str();
616 topic == "human.response" || topic == "human.guidance"
617 })
618 })
619 .unwrap_or(false)
620 })
621 }
622
623 pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
627 self.registry
628 .get(hat_id)
629 .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
630 .unwrap_or_default()
631 }
632
633 pub fn inject_fallback_event(&mut self) -> bool {
640 let fallback_event = Event::new(
641 "task.resume",
642 "RECOVERY: Previous iteration did not publish an event. \
643 Review the scratchpad and either dispatch the next task or complete the loop.",
644 );
645
646 let fallback_event = match &self.state.last_hat {
649 Some(hat_id) if hat_id.as_str() != "ralph" => {
650 debug!(
651 hat = %hat_id.as_str(),
652 "Injecting fallback event to recover - targeting last hat with task.resume"
653 );
654 fallback_event.with_target(hat_id.clone())
655 }
656 _ => {
657 debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
658 fallback_event
659 }
660 };
661
662 self.bus.publish(fallback_event);
663 true
664 }
665
666 pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
680 if hat_id.as_str() == "ralph" {
683 if self.registry.is_empty() {
684 let events = self.bus.take_pending(&hat_id.clone());
686
687 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
689 .into_iter()
690 .partition(|e| e.topic.as_str() == "human.guidance");
691
692 let events_context = regular_events
693 .iter()
694 .map(|e| Self::format_event(e))
695 .collect::<Vec<_>>()
696 .join("\n");
697
698 self.update_robot_guidance(guidance_events);
700 self.apply_robot_guidance();
701
702 let base_prompt = self.ralph.build_prompt(&events_context, &[]);
704 self.ralph.clear_robot_guidance();
705 let with_skills = self.prepend_auto_inject_skills(base_prompt);
706 let with_scratchpad = self.prepend_scratchpad(with_skills);
707 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
708
709 debug!("build_prompt: routing to HatlessRalph (solo mode)");
710 return Some(final_prompt);
711 } else {
712 let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
714 all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
716
717 let mut all_events = Vec::new();
718 let mut system_events = Vec::new();
719
720 for id in &all_hat_ids {
721 let pending = self.bus.take_pending(id);
722 if pending.is_empty() {
723 continue;
724 }
725
726 let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
727 if drop_pending {
728 if let Some(exhausted_event) = exhausted_event {
730 all_events.push(exhausted_event.clone());
731 system_events.push(exhausted_event);
732 }
733 continue;
734 }
735
736 all_events.extend(pending);
737 }
738
739 for event in system_events {
742 self.bus.publish(event);
743 }
744
745 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
747 .into_iter()
748 .partition(|e| e.topic.as_str() == "human.guidance");
749
750 self.update_robot_guidance(guidance_events);
753 self.apply_robot_guidance();
754
755 let active_hat_ids = self.determine_active_hat_ids(®ular_events);
757 self.record_hat_activations(&active_hat_ids);
758 let active_hats = self.determine_active_hats(®ular_events);
759
760 let events_context = regular_events
762 .iter()
763 .map(|e| Self::format_event(e))
764 .collect::<Vec<_>>()
765 .join("\n");
766
767 let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
769
770 debug!(
772 "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
773 active_hats
774 .iter()
775 .map(|h| h.id.as_str())
776 .collect::<Vec<_>>()
777 );
778
779 self.ralph.clear_robot_guidance();
781 let with_skills = self.prepend_auto_inject_skills(base_prompt);
782 let with_scratchpad = self.prepend_scratchpad(with_skills);
783 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
784
785 return Some(final_prompt);
786 }
787 }
788
789 let events = self.bus.take_pending(&hat_id.clone());
793 let events_context = events
794 .iter()
795 .map(|e| Self::format_event(e))
796 .collect::<Vec<_>>()
797 .join("\n");
798
799 let hat = self.registry.get(hat_id)?;
800
801 debug!(
803 "build_prompt: hat_id='{}', instructions.is_empty()={}",
804 hat_id.as_str(),
805 hat.instructions.is_empty()
806 );
807
808 debug!(
810 "build_prompt: routing to build_custom_hat() for '{}'",
811 hat_id.as_str()
812 );
813 Some(
814 self.instruction_builder
815 .build_custom_hat(hat, &events_context),
816 )
817 }
818
819 fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
825 if guidance_events.is_empty() {
826 return;
827 }
828
829 self.persist_guidance_to_scratchpad(&guidance_events);
831
832 self.robot_guidance
833 .extend(guidance_events.into_iter().map(|e| e.payload));
834 }
835
836 fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
841 use std::io::Write;
842
843 let scratchpad_path = self.scratchpad_path();
844 let resolved_path = if scratchpad_path.is_relative() {
845 self.config.core.workspace_root.join(&scratchpad_path)
846 } else {
847 scratchpad_path
848 };
849
850 if let Some(parent) = resolved_path.parent()
852 && !parent.exists()
853 && let Err(e) = std::fs::create_dir_all(parent)
854 {
855 warn!("Failed to create scratchpad directory: {}", e);
856 return;
857 }
858
859 let mut file = match std::fs::OpenOptions::new()
860 .create(true)
861 .append(true)
862 .open(&resolved_path)
863 {
864 Ok(f) => f,
865 Err(e) => {
866 warn!("Failed to open scratchpad for guidance persistence: {}", e);
867 return;
868 }
869 };
870
871 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
872 for event in guidance_events {
873 let entry = format!(
874 "\n### HUMAN GUIDANCE ({})\n\n{}\n",
875 timestamp, event.payload
876 );
877 if let Err(e) = file.write_all(entry.as_bytes()) {
878 warn!("Failed to write guidance to scratchpad: {}", e);
879 }
880 }
881
882 info!(
883 count = guidance_events.len(),
884 "Persisted human guidance to scratchpad"
885 );
886 }
887
888 fn apply_robot_guidance(&mut self) {
890 if self.robot_guidance.is_empty() {
891 return;
892 }
893
894 self.ralph.set_robot_guidance(self.robot_guidance.clone());
895 }
896
897 fn prepend_auto_inject_skills(&self, prompt: String) -> String {
907 let mut prefix = String::new();
908
909 self.inject_memories_and_tools_skill(&mut prefix);
911
912 self.inject_robot_skill(&mut prefix);
914
915 self.inject_custom_auto_skills(&mut prefix);
917
918 if prefix.is_empty() {
919 return prompt;
920 }
921
922 prefix.push_str("\n\n");
923 prefix.push_str(&prompt);
924 prefix
925 }
926
927 fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
935 let memories_config = &self.config.memories;
936
937 if memories_config.enabled && memories_config.inject == InjectMode::Auto {
939 info!(
940 "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
941 memories_config.enabled, memories_config.inject, self.config.core.workspace_root
942 );
943
944 let workspace_root = &self.config.core.workspace_root;
945 let store = MarkdownMemoryStore::with_default_path(workspace_root);
946 let memories_path = workspace_root.join(".ralph/agent/memories.md");
947
948 info!(
949 "Looking for memories at: {:?} (exists: {})",
950 memories_path,
951 memories_path.exists()
952 );
953
954 let memories = match store.load() {
955 Ok(memories) => {
956 info!("Successfully loaded {} memories from store", memories.len());
957 memories
958 }
959 Err(e) => {
960 info!(
961 "Failed to load memories for injection: {} (path: {:?})",
962 e, memories_path
963 );
964 Vec::new()
965 }
966 };
967
968 if memories.is_empty() {
969 info!("Memory store is empty - no memories to inject");
970 } else {
971 let mut memories_content = format_memories_as_markdown(&memories);
972
973 if memories_config.budget > 0 {
974 let original_len = memories_content.len();
975 memories_content =
976 truncate_to_budget(&memories_content, memories_config.budget);
977 debug!(
978 "Applied budget: {} chars -> {} chars (budget: {})",
979 original_len,
980 memories_content.len(),
981 memories_config.budget
982 );
983 }
984
985 info!(
986 "Injecting {} memories ({} chars) into prompt",
987 memories.len(),
988 memories_content.len()
989 );
990
991 prefix.push_str(&memories_content);
992 }
993 }
994
995 if memories_config.enabled || self.config.tasks.enabled {
997 if let Some(skill) = self.skill_registry.get("ralph-tools") {
998 if !prefix.is_empty() {
999 prefix.push_str("\n\n");
1000 }
1001 prefix.push_str(&format!(
1002 "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1003 skill.content.trim()
1004 ));
1005 debug!("Injected ralph-tools skill from registry");
1006 } else {
1007 debug!("ralph-tools skill not found in registry - skill content not injected");
1008 }
1009 }
1010 }
1011
1012 fn inject_robot_skill(&self, prefix: &mut String) {
1017 if !self.config.robot.enabled {
1018 return;
1019 }
1020
1021 if let Some(skill) = self.skill_registry.get("robot-interaction") {
1022 if !prefix.is_empty() {
1023 prefix.push_str("\n\n");
1024 }
1025 prefix.push_str(&format!(
1026 "<robot-skill>\n{}\n</robot-skill>",
1027 skill.content.trim()
1028 ));
1029 debug!("Injected robot interaction skill from registry");
1030 }
1031 }
1032
1033 fn inject_custom_auto_skills(&self, prefix: &mut String) {
1035 for skill in self.skill_registry.auto_inject_skills(None) {
1036 if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1038 continue;
1039 }
1040
1041 if !prefix.is_empty() {
1042 prefix.push_str("\n\n");
1043 }
1044 prefix.push_str(&format!(
1045 "<{name}-skill>\n{content}\n</{name}-skill>",
1046 name = skill.name,
1047 content = skill.content.trim()
1048 ));
1049 debug!("Injected auto-inject skill: {}", skill.name);
1050 }
1051 }
1052
1053 fn prepend_scratchpad(&self, prompt: String) -> String {
1059 let scratchpad_path = self.scratchpad_path();
1060
1061 let resolved_path = if scratchpad_path.is_relative() {
1062 self.config.core.workspace_root.join(&scratchpad_path)
1063 } else {
1064 scratchpad_path
1065 };
1066
1067 if !resolved_path.exists() {
1068 debug!(
1069 "Scratchpad not found at {:?}, skipping injection",
1070 resolved_path
1071 );
1072 return prompt;
1073 }
1074
1075 let content = match std::fs::read_to_string(&resolved_path) {
1076 Ok(c) => c,
1077 Err(e) => {
1078 info!("Failed to read scratchpad for injection: {}", e);
1079 return prompt;
1080 }
1081 };
1082
1083 if content.trim().is_empty() {
1084 debug!("Scratchpad is empty, skipping injection");
1085 return prompt;
1086 }
1087
1088 let char_budget = 4000 * 4;
1090 let content = if content.len() > char_budget {
1091 let start = content.len() - char_budget;
1093 let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1094 let discarded = &content[..line_start];
1095
1096 let headings: Vec<&str> = discarded
1098 .lines()
1099 .filter(|line| line.starts_with('#'))
1100 .collect();
1101 let summary = if headings.is_empty() {
1102 format!(
1103 "<!-- earlier content truncated ({} chars omitted) -->",
1104 line_start
1105 )
1106 } else {
1107 format!(
1108 "<!-- earlier content truncated ({} chars omitted) -->\n\
1109 <!-- discarded sections: {} -->",
1110 line_start,
1111 headings.join(" | ")
1112 )
1113 };
1114
1115 format!("{}\n\n{}", summary, &content[line_start..])
1116 } else {
1117 content
1118 };
1119
1120 info!("Injecting scratchpad ({} chars) into prompt", content.len());
1121
1122 let mut final_prompt = format!(
1123 "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1124 self.config.core.scratchpad, content
1125 );
1126 final_prompt.push_str(&prompt);
1127 final_prompt
1128 }
1129
1130 fn prepend_ready_tasks(&self, prompt: String) -> String {
1136 if !self.config.tasks.enabled {
1137 return prompt;
1138 }
1139
1140 use crate::task::TaskStatus;
1141 use crate::task_store::TaskStore;
1142
1143 let tasks_path = self.tasks_path();
1144 let resolved_path = if tasks_path.is_relative() {
1145 self.config.core.workspace_root.join(&tasks_path)
1146 } else {
1147 tasks_path
1148 };
1149
1150 if !resolved_path.exists() {
1151 return prompt;
1152 }
1153
1154 let store = match TaskStore::load(&resolved_path) {
1155 Ok(s) => s,
1156 Err(e) => {
1157 info!("Failed to load task store for injection: {}", e);
1158 return prompt;
1159 }
1160 };
1161
1162 let ready = store.ready();
1163 let open = store.open();
1164 let closed_count = store.all().len() - open.len();
1165
1166 if open.is_empty() && closed_count == 0 {
1167 return prompt;
1168 }
1169
1170 let mut section = String::from("<ready-tasks>\n");
1171 if ready.is_empty() && open.is_empty() {
1172 section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1173 } else {
1174 section.push_str(&format!(
1175 "## Tasks: {} ready, {} open, {} closed\n\n",
1176 ready.len(),
1177 open.len(),
1178 closed_count
1179 ));
1180 for task in &ready {
1181 let status_icon = match task.status {
1182 TaskStatus::Open => "[ ]",
1183 TaskStatus::InProgress => "[~]",
1184 _ => "[?]",
1185 };
1186 section.push_str(&format!(
1187 "- {} [P{}] {} ({})\n",
1188 status_icon, task.priority, task.title, task.id
1189 ));
1190 }
1191 let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1193 let blocked: Vec<_> = open
1194 .iter()
1195 .filter(|t| !ready_ids.contains(&t.id.as_str()))
1196 .collect();
1197 if !blocked.is_empty() {
1198 section.push_str("\nBlocked:\n");
1199 for task in blocked {
1200 section.push_str(&format!(
1201 "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1202 task.priority,
1203 task.title,
1204 task.id,
1205 task.blocked_by.join(", ")
1206 ));
1207 }
1208 }
1209 }
1210 section.push_str("</ready-tasks>\n\n");
1211
1212 info!(
1213 "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1214 ready.len(),
1215 open.len(),
1216 closed_count
1217 );
1218
1219 let mut final_prompt = section;
1220 final_prompt.push_str(&prompt);
1221 final_prompt
1222 }
1223
1224 pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1226 self.ralph.build_prompt(prompt_content, &[])
1227 }
1228
1229 fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1232 let mut active_hats = Vec::new();
1233 for id in self.determine_active_hat_ids(events) {
1234 if let Some(hat) = self.registry.get(&id) {
1235 active_hats.push(hat);
1236 }
1237 }
1238 active_hats
1239 }
1240
1241 fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1242 let mut active_hat_ids = Vec::new();
1243 for event in events {
1244 if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1245 if !active_hat_ids.iter().any(|id| id == &hat.id) {
1247 active_hat_ids.push(hat.id.clone());
1248 }
1249 }
1250 }
1251 active_hat_ids
1252 }
1253
1254 fn format_event(event: &Event) -> String {
1259 let topic = &event.topic;
1260 let payload = &event.payload;
1261
1262 if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1263 format!(
1264 "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1265 topic, payload
1266 )
1267 } else {
1268 format!("Event: {} - {}", topic, payload)
1269 }
1270 }
1271
1272 fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1273 let Some(config) = self.registry.get_config(hat_id) else {
1274 return (false, None);
1275 };
1276 let Some(max) = config.max_activations else {
1277 return (false, None);
1278 };
1279
1280 let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1281 if count < max {
1282 return (false, None);
1283 }
1284
1285 let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1287
1288 if !should_emit {
1289 return (true, None);
1291 }
1292
1293 let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1294 dropped_topics.sort();
1295
1296 let payload = format!(
1297 "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n - {topics}",
1298 hat = hat_id.as_str(),
1299 max = max,
1300 count = count,
1301 topics = dropped_topics.join("\n - ")
1302 );
1303
1304 warn!(
1305 hat = %hat_id.as_str(),
1306 max_activations = max,
1307 activations = count,
1308 "Hat exhausted (max_activations reached)"
1309 );
1310
1311 (
1312 true,
1313 Some(Event::new(
1314 format!("{}.exhausted", hat_id.as_str()),
1315 payload,
1316 )),
1317 )
1318 }
1319
1320 fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1321 for hat_id in active_hat_ids {
1322 *self
1323 .state
1324 .hat_activation_counts
1325 .entry(hat_id.clone())
1326 .or_insert(0) += 1;
1327 }
1328 }
1329
1330 pub fn get_active_hat_id(&self) -> HatId {
1333 for hat_id in self.bus.hat_ids() {
1335 let Some(events) = self.bus.peek_pending(hat_id) else {
1336 continue;
1337 };
1338 let Some(event) = events.first() else {
1339 continue;
1340 };
1341 if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1342 return active_hat.id.clone();
1343 }
1344 }
1345 HatId::new("ralph")
1346 }
1347
1348 pub fn record_event_count(&mut self) -> usize {
1353 self.event_reader
1354 .read_new_events()
1355 .map(|r| r.events.len())
1356 .unwrap_or(0)
1357 }
1358
1359 pub fn check_default_publishes(&mut self, hat_id: &HatId, _events_before: usize) {
1365 let events_after = self
1366 .event_reader
1367 .read_new_events()
1368 .map(|r| r.events.len())
1369 .unwrap_or(0);
1370
1371 if events_after == 0
1372 && let Some(config) = self.registry.get_config(hat_id)
1373 && let Some(default_topic) = &config.default_publishes
1374 {
1375 let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1377
1378 debug!(
1379 hat = %hat_id.as_str(),
1380 topic = %default_topic,
1381 "No events written by hat, injecting default_publishes event"
1382 );
1383
1384 self.bus.publish(default_event);
1385 }
1386 }
1387
1388 pub fn bus(&mut self) -> &mut EventBus {
1393 &mut self.bus
1394 }
1395
1396 pub fn process_output(
1400 &mut self,
1401 hat_id: &HatId,
1402 output: &str,
1403 success: bool,
1404 ) -> Option<TerminationReason> {
1405 self.state.iteration += 1;
1406 self.state.last_hat = Some(hat_id.clone());
1407
1408 if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1410 && let Some(ref telegram_service) = self.telegram_service
1411 {
1412 let elapsed = self.state.elapsed();
1413 let interval = std::time::Duration::from_secs(interval_secs);
1414 let last = self
1415 .state
1416 .last_checkin_at
1417 .map(|t| t.elapsed())
1418 .unwrap_or(elapsed);
1419
1420 if last >= interval {
1421 let context = self.build_checkin_context(hat_id);
1422 match telegram_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1423 Ok(_) => {
1424 self.state.last_checkin_at = Some(std::time::Instant::now());
1425 debug!(iteration = self.state.iteration, "Sent Telegram check-in");
1426 }
1427 Err(e) => {
1428 warn!(error = %e, "Failed to send Telegram check-in");
1429 }
1430 }
1431 }
1432 }
1433
1434 self.diagnostics.log_orchestration(
1436 self.state.iteration,
1437 "loop",
1438 crate::diagnostics::OrchestrationEvent::IterationStarted,
1439 );
1440
1441 self.diagnostics.log_orchestration(
1443 self.state.iteration,
1444 "loop",
1445 crate::diagnostics::OrchestrationEvent::HatSelected {
1446 hat: hat_id.to_string(),
1447 reason: "process_output".to_string(),
1448 },
1449 );
1450
1451 if success {
1453 self.state.consecutive_failures = 0;
1454 } else {
1455 self.state.consecutive_failures += 1;
1456 }
1457
1458 if hat_id.as_str() == "ralph"
1462 && EventParser::contains_promise(output, &self.config.event_loop.completion_promise)
1463 {
1464 if self.config.memories.enabled {
1466 if let Ok(false) = self.verify_tasks_complete() {
1467 let open_tasks = self.get_open_task_list();
1468 warn!(
1469 open_tasks = ?open_tasks,
1470 "LOOP_COMPLETE with {} open task(s) - trusting agent decision",
1471 open_tasks.len()
1472 );
1473 }
1474 } else if let Ok(false) = self.verify_scratchpad_complete() {
1475 warn!("LOOP_COMPLETE with pending scratchpad tasks - trusting agent decision");
1476 }
1477
1478 info!("LOOP_COMPLETE detected - terminating");
1480
1481 self.diagnostics.log_orchestration(
1483 self.state.iteration,
1484 "loop",
1485 crate::diagnostics::OrchestrationEvent::LoopTerminated {
1486 reason: "completion_promise".to_string(),
1487 },
1488 );
1489
1490 return Some(TerminationReason::CompletionPromise);
1491 }
1492
1493 self.check_termination()
1499 }
1500
1501 fn extract_task_id(payload: &str) -> String {
1504 payload
1505 .lines()
1506 .next()
1507 .unwrap_or("unknown")
1508 .trim()
1509 .to_string()
1510 }
1511
1512 pub fn add_cost(&mut self, cost: f64) {
1514 self.state.cumulative_cost += cost;
1515 }
1516
1517 fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1524 let scratchpad_path = self.scratchpad_path();
1525
1526 if !scratchpad_path.exists() {
1527 return Err(std::io::Error::new(
1528 std::io::ErrorKind::NotFound,
1529 "Scratchpad does not exist",
1530 ));
1531 }
1532
1533 let content = std::fs::read_to_string(scratchpad_path)?;
1534
1535 let has_pending = content
1536 .lines()
1537 .any(|line| line.trim_start().starts_with("- [ ]"));
1538
1539 Ok(!has_pending)
1540 }
1541
1542 fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1543 use crate::task_store::TaskStore;
1544
1545 let tasks_path = self.tasks_path();
1546
1547 if !tasks_path.exists() {
1549 return Ok(true);
1550 }
1551
1552 let store = TaskStore::load(&tasks_path)?;
1553 Ok(!store.has_pending_tasks())
1554 }
1555
1556 fn build_checkin_context(&self, hat_id: &HatId) -> ralph_telegram::CheckinContext {
1558 let (open_tasks, closed_tasks) = self.count_tasks();
1559 ralph_telegram::CheckinContext {
1560 current_hat: Some(hat_id.as_str().to_string()),
1561 open_tasks,
1562 closed_tasks,
1563 cumulative_cost: self.state.cumulative_cost,
1564 }
1565 }
1566
1567 fn count_tasks(&self) -> (usize, usize) {
1572 use crate::task::TaskStatus;
1573 use crate::task_store::TaskStore;
1574
1575 let tasks_path = self.tasks_path();
1576 if !tasks_path.exists() {
1577 return (0, 0);
1578 }
1579
1580 match TaskStore::load(&tasks_path) {
1581 Ok(store) => {
1582 let total = store.all().len();
1583 let open = store.open().len();
1584 let closed = total - open;
1585 debug_assert_eq!(
1587 closed,
1588 store
1589 .all()
1590 .iter()
1591 .filter(|t| t.status == TaskStatus::Closed)
1592 .count()
1593 );
1594 (open, closed)
1595 }
1596 Err(_) => (0, 0),
1597 }
1598 }
1599
1600 fn get_open_task_list(&self) -> Vec<String> {
1602 use crate::task_store::TaskStore;
1603
1604 let tasks_path = self.tasks_path();
1605 if let Ok(store) = TaskStore::load(&tasks_path) {
1606 return store
1607 .open()
1608 .iter()
1609 .map(|t| format!("{}: {}", t.id, t.title))
1610 .collect();
1611 }
1612 vec![]
1613 }
1614
1615 pub fn process_events_from_jsonl(&mut self) -> std::io::Result<bool> {
1624 let result = self.event_reader.read_new_events()?;
1625
1626 for malformed in &result.malformed {
1628 let payload = format!(
1629 "Line {}: {}\nContent: {}",
1630 malformed.line_number, malformed.error, &malformed.content
1631 );
1632 let event = Event::new("event.malformed", &payload);
1633 self.bus.publish(event);
1634 self.state.consecutive_malformed_events += 1;
1635 warn!(
1636 line = malformed.line_number,
1637 consecutive = self.state.consecutive_malformed_events,
1638 "Malformed event line detected"
1639 );
1640 }
1641
1642 if !result.events.is_empty() {
1644 self.state.consecutive_malformed_events = 0;
1645 }
1646
1647 if result.events.is_empty() && result.malformed.is_empty() {
1648 return Ok(false);
1649 }
1650
1651 let mut has_orphans = false;
1652
1653 let mut validated_events = Vec::new();
1655 for event in result.events {
1656 let payload = event.payload.clone().unwrap_or_default();
1657
1658 if event.topic == "build.done" {
1659 if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1661 if evidence.all_passed() {
1662 validated_events.push(Event::new(event.topic.as_str(), &payload));
1663 } else {
1664 warn!(
1666 tests = evidence.tests_passed,
1667 lint = evidence.lint_passed,
1668 typecheck = evidence.typecheck_passed,
1669 "build.done rejected: backpressure checks failed"
1670 );
1671
1672 self.diagnostics.log_orchestration(
1673 self.state.iteration,
1674 "jsonl",
1675 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1676 reason: format!(
1677 "backpressure checks failed: tests={}, lint={}, typecheck={}",
1678 evidence.tests_passed,
1679 evidence.lint_passed,
1680 evidence.typecheck_passed
1681 ),
1682 },
1683 );
1684
1685 validated_events.push(Event::new(
1686 "build.blocked",
1687 "Backpressure checks failed. Fix tests/lint/typecheck before emitting build.done.",
1688 ));
1689 }
1690 } else {
1691 warn!("build.done rejected: missing backpressure evidence");
1693
1694 self.diagnostics.log_orchestration(
1695 self.state.iteration,
1696 "jsonl",
1697 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1698 reason: "missing backpressure evidence".to_string(),
1699 },
1700 );
1701
1702 validated_events.push(Event::new(
1703 "build.blocked",
1704 "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass' in build.done payload.",
1705 ));
1706 }
1707 } else if event.topic == "review.done" {
1708 if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
1710 if evidence.is_verified() {
1711 validated_events.push(Event::new(event.topic.as_str(), &payload));
1712 } else {
1713 warn!(
1715 tests = evidence.tests_passed,
1716 build = evidence.build_passed,
1717 "review.done rejected: verification checks failed"
1718 );
1719
1720 self.diagnostics.log_orchestration(
1721 self.state.iteration,
1722 "jsonl",
1723 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1724 reason: format!(
1725 "review verification failed: tests={}, build={}",
1726 evidence.tests_passed, evidence.build_passed
1727 ),
1728 },
1729 );
1730
1731 validated_events.push(Event::new(
1732 "review.blocked",
1733 "Review verification failed. Run tests and build before emitting review.done.",
1734 ));
1735 }
1736 } else {
1737 warn!("review.done rejected: missing verification evidence");
1739
1740 self.diagnostics.log_orchestration(
1741 self.state.iteration,
1742 "jsonl",
1743 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
1744 reason: "missing review verification evidence".to_string(),
1745 },
1746 );
1747
1748 validated_events.push(Event::new(
1749 "review.blocked",
1750 "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
1751 ));
1752 }
1753 } else {
1754 validated_events.push(Event::new(event.topic.as_str(), &payload));
1756 }
1757 }
1758
1759 let blocked_events: Vec<_> = validated_events
1761 .iter()
1762 .filter(|e| e.topic == "build.blocked".into())
1763 .collect();
1764
1765 for blocked_event in &blocked_events {
1766 let task_id = Self::extract_task_id(&blocked_event.payload);
1767
1768 let count = self
1769 .state
1770 .task_block_counts
1771 .entry(task_id.clone())
1772 .or_insert(0);
1773 *count += 1;
1774
1775 debug!(
1776 task_id = %task_id,
1777 block_count = *count,
1778 "Task blocked"
1779 );
1780
1781 if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
1783 warn!(
1784 task_id = %task_id,
1785 "Task abandoned after 3 consecutive blocks"
1786 );
1787
1788 self.state.abandoned_tasks.push(task_id.clone());
1789
1790 self.diagnostics.log_orchestration(
1791 self.state.iteration,
1792 "jsonl",
1793 crate::diagnostics::OrchestrationEvent::TaskAbandoned {
1794 reason: format!(
1795 "3 consecutive build.blocked events for task '{}'",
1796 task_id
1797 ),
1798 },
1799 );
1800
1801 let abandoned_event = Event::new(
1802 "build.task.abandoned",
1803 format!(
1804 "Task '{}' abandoned after 3 consecutive build.blocked events",
1805 task_id
1806 ),
1807 );
1808
1809 self.bus.publish(abandoned_event);
1810 }
1811 }
1812
1813 let has_blocked_event = !blocked_events.is_empty();
1815
1816 if has_blocked_event {
1817 self.state.consecutive_blocked += 1;
1818 } else {
1819 self.state.consecutive_blocked = 0;
1820 self.state.last_blocked_hat = None;
1821 }
1822
1823 let mut response_event = None;
1827 let ask_human_idx = validated_events
1828 .iter()
1829 .position(|e| e.topic == "interact.human".into());
1830
1831 if let Some(idx) = ask_human_idx {
1832 let ask_event = &validated_events[idx];
1833 let payload = ask_event.payload.clone();
1834
1835 if let Some(ref telegram_service) = self.telegram_service {
1836 info!(
1837 payload = %payload,
1838 "interact.human event detected — sending question via Telegram"
1839 );
1840
1841 let send_ok = match telegram_service.send_question(&payload) {
1843 Ok(_message_id) => true,
1844 Err(e) => {
1845 warn!(
1846 error = %e,
1847 "Failed to send interact.human question after retries — treating as timeout"
1848 );
1849 self.diagnostics.log_error(
1851 self.state.iteration,
1852 "telegram",
1853 crate::diagnostics::DiagnosticError::TelegramSendError {
1854 operation: "send_question".to_string(),
1855 error: e.to_string(),
1856 retry_count: ralph_telegram::MAX_SEND_RETRIES,
1857 },
1858 );
1859 false
1860 }
1861 };
1862
1863 if send_ok {
1866 let events_path = self
1867 .loop_context
1868 .as_ref()
1869 .map(|ctx| ctx.events_path())
1870 .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"));
1871
1872 match telegram_service.wait_for_response(&events_path) {
1873 Ok(Some(response)) => {
1874 info!(
1875 response = %response,
1876 "Received human.response — continuing loop"
1877 );
1878 response_event = Some(Event::new("human.response", &response));
1880 }
1881 Ok(None) => {
1882 warn!(
1883 timeout_secs = telegram_service.timeout_secs(),
1884 "Human response timeout — continuing without response"
1885 );
1886 }
1887 Err(e) => {
1888 warn!(
1889 error = %e,
1890 "Error waiting for human response — continuing without response"
1891 );
1892 }
1893 }
1894 }
1895 } else {
1896 debug!(
1897 "interact.human event detected but no Telegram service active — passing through"
1898 );
1899 }
1900 }
1901
1902 for event in validated_events {
1907 self.diagnostics.log_orchestration(
1908 self.state.iteration,
1909 "jsonl",
1910 crate::diagnostics::OrchestrationEvent::EventPublished {
1911 topic: event.topic.to_string(),
1912 },
1913 );
1914
1915 if !self.registry.has_subscriber(event.topic.as_str()) {
1916 has_orphans = true;
1917 }
1918
1919 debug!(
1920 topic = %event.topic,
1921 "Publishing event from JSONL"
1922 );
1923 self.bus.publish(event);
1924 }
1925
1926 if let Some(response) = response_event {
1928 info!(
1929 topic = %response.topic,
1930 "Publishing human.response event from Telegram"
1931 );
1932 self.bus.publish(response);
1933 }
1934
1935 Ok(has_orphans)
1936 }
1937
1938 pub fn check_ralph_completion(&self, output: &str) -> bool {
1942 EventParser::contains_promise(output, &self.config.event_loop.completion_promise)
1943 }
1944
1945 pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
1952 self.stop_telegram_service();
1954
1955 let elapsed = self.state.elapsed();
1956 let duration_str = format_duration(elapsed);
1957
1958 let payload = format!(
1959 "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
1960 reason.as_str(),
1961 termination_status_text(reason),
1962 self.state.iteration,
1963 duration_str,
1964 reason.exit_code()
1965 );
1966
1967 let event = Event::new("loop.terminate", &payload);
1968
1969 self.bus.publish(event.clone());
1971
1972 info!(
1973 reason = %reason.as_str(),
1974 iterations = self.state.iteration,
1975 duration = %duration_str,
1976 "Wrapping up: {}. {} iterations in {}.",
1977 reason.as_str(),
1978 self.state.iteration,
1979 duration_str
1980 );
1981
1982 event
1983 }
1984
1985 pub fn telegram_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
1990 self.telegram_service.as_ref().map(|s| s.shutdown_flag())
1991 }
1992
1993 pub fn telegram_service(&self) -> Option<&TelegramService> {
1995 self.telegram_service.as_ref()
1996 }
1997
1998 pub fn telegram_service_mut(&mut self) -> Option<&mut TelegramService> {
2000 self.telegram_service.as_mut()
2001 }
2002
2003 fn stop_telegram_service(&mut self) {
2007 if let Some(service) = self.telegram_service.take() {
2008 service.stop();
2009 }
2010 }
2011
2012 pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2020 events
2021 .iter()
2022 .find(|e| e.topic.as_str() == "user.prompt")
2023 .map(|e| UserPrompt {
2024 id: Self::extract_prompt_id(&e.payload),
2025 text: e.payload.clone(),
2026 })
2027 }
2028
2029 fn extract_prompt_id(payload: &str) -> String {
2034 if let Some(start) = payload.find("id=\"")
2036 && let Some(end) = payload[start + 4..].find('"')
2037 {
2038 return payload[start + 4..start + 4 + end].to_string();
2039 }
2040
2041 format!("q{}", Self::generate_prompt_id())
2043 }
2044
2045 fn generate_prompt_id() -> String {
2048 use std::time::{SystemTime, UNIX_EPOCH};
2049 let nanos = SystemTime::now()
2050 .duration_since(UNIX_EPOCH)
2051 .unwrap()
2052 .as_nanos();
2053 format!("{:x}", nanos % 0xFFFF_FFFF)
2054 }
2055}
2056
2057#[derive(Debug, Clone)]
2061pub struct UserPrompt {
2062 pub id: String,
2064 pub text: String,
2066}
2067
2068fn format_duration(d: Duration) -> String {
2070 let total_secs = d.as_secs();
2071 let hours = total_secs / 3600;
2072 let minutes = (total_secs % 3600) / 60;
2073 let seconds = total_secs % 60;
2074
2075 if hours > 0 {
2076 format!("{}h {}m {}s", hours, minutes, seconds)
2077 } else if minutes > 0 {
2078 format!("{}m {}s", minutes, seconds)
2079 } else {
2080 format!("{}s", seconds)
2081 }
2082}
2083
2084fn termination_status_text(reason: &TerminationReason) -> &'static str {
2086 match reason {
2087 TerminationReason::CompletionPromise => "All tasks completed successfully.",
2088 TerminationReason::MaxIterations => "Stopped at iteration limit.",
2089 TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2090 TerminationReason::MaxCost => "Stopped at cost limit.",
2091 TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2092 TerminationReason::LoopThrashing => {
2093 "Loop thrashing detected - same hat repeatedly blocked."
2094 }
2095 TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2096 TerminationReason::Stopped => "Manually stopped.",
2097 TerminationReason::Interrupted => "Interrupted by signal.",
2098 TerminationReason::ChaosModeComplete => "Chaos mode exploration complete.",
2099 TerminationReason::ChaosModeMaxIterations => "Chaos mode stopped at iteration limit.",
2100 TerminationReason::RestartRequested => "Restarting by human request.",
2101 }
2102}