1mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use crate::text::floor_char_boundary;
21use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
22use serde_json::{Map, Value};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use std::time::Duration;
27use tracing::{debug, info, warn};
28
29#[derive(Debug, Clone)]
31pub struct ProcessedEvents {
32 pub had_events: bool,
34 pub had_plan_events: bool,
36 pub human_interact_context: Option<Value>,
39 pub has_orphans: bool,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub enum TerminationReason {
46 CompletionPromise,
48 MaxIterations,
50 MaxRuntime,
52 MaxCost,
54 ConsecutiveFailures,
56 LoopThrashing,
58 LoopStale,
60 ValidationFailure,
62 Stopped,
64 Interrupted,
66 RestartRequested,
68 WorkspaceGone,
70 Cancelled,
72}
73
74impl TerminationReason {
75 pub fn exit_code(&self) -> i32 {
83 match self {
84 TerminationReason::CompletionPromise => 0,
85 TerminationReason::ConsecutiveFailures
86 | TerminationReason::LoopThrashing
87 | TerminationReason::LoopStale
88 | TerminationReason::ValidationFailure
89 | TerminationReason::Stopped
90 | TerminationReason::WorkspaceGone => 1,
91 TerminationReason::MaxIterations
92 | TerminationReason::MaxRuntime
93 | TerminationReason::MaxCost => 2,
94 TerminationReason::Interrupted => 130,
95 TerminationReason::RestartRequested => 3,
97 TerminationReason::Cancelled => 0,
99 }
100 }
101
102 pub fn as_str(&self) -> &'static str {
107 match self {
108 TerminationReason::CompletionPromise => "completed",
109 TerminationReason::MaxIterations => "max_iterations",
110 TerminationReason::MaxRuntime => "max_runtime",
111 TerminationReason::MaxCost => "max_cost",
112 TerminationReason::ConsecutiveFailures => "consecutive_failures",
113 TerminationReason::LoopThrashing => "loop_thrashing",
114 TerminationReason::LoopStale => "loop_stale",
115 TerminationReason::ValidationFailure => "validation_failure",
116 TerminationReason::Stopped => "stopped",
117 TerminationReason::Interrupted => "interrupted",
118 TerminationReason::RestartRequested => "restart_requested",
119 TerminationReason::WorkspaceGone => "workspace_gone",
120 TerminationReason::Cancelled => "cancelled",
121 }
122 }
123
124 pub fn is_success(&self) -> bool {
126 matches!(self, TerminationReason::CompletionPromise)
127 }
128}
129
130pub struct EventLoop {
132 config: RalphConfig,
133 registry: HatRegistry,
134 bus: EventBus,
135 state: LoopState,
136 instruction_builder: InstructionBuilder,
137 ralph: HatlessRalph,
138 robot_guidance: Vec<String>,
140 pub(crate) event_reader: EventReader,
143 diagnostics: crate::diagnostics::DiagnosticsCollector,
144 loop_context: Option<LoopContext>,
146 skill_registry: SkillRegistry,
148 robot_service: Option<Box<dyn RobotService>>,
151}
152
153impl EventLoop {
154 pub fn new(config: RalphConfig) -> Self {
156 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
159 .unwrap_or_else(|e| {
160 debug!(
161 "Failed to initialize diagnostics: {}, using disabled collector",
162 e
163 );
164 crate::diagnostics::DiagnosticsCollector::disabled()
165 });
166
167 Self::with_diagnostics(config, diagnostics)
168 }
169
170 pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
176 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
177 .unwrap_or_else(|e| {
178 debug!(
179 "Failed to initialize diagnostics: {}, using disabled collector",
180 e
181 );
182 crate::diagnostics::DiagnosticsCollector::disabled()
183 });
184
185 Self::with_context_and_diagnostics(config, context, diagnostics)
186 }
187
188 pub fn with_context_and_diagnostics(
190 config: RalphConfig,
191 context: LoopContext,
192 diagnostics: crate::diagnostics::DiagnosticsCollector,
193 ) -> Self {
194 let registry = HatRegistry::from_config(&config);
195 let instruction_builder =
196 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
197
198 let mut bus = EventBus::new();
199
200 for hat in registry.all() {
204 bus.register(hat.clone());
205 }
206
207 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
211
212 if registry.is_empty() {
213 debug!("Solo mode: Ralph is the only coordinator");
214 } else {
215 debug!(
216 "Multi-hat mode: {} custom hats + Ralph as fallback",
217 registry.len()
218 );
219 }
220
221 let skill_registry = if config.skills.enabled {
223 SkillRegistry::from_config(
224 &config.skills,
225 context.workspace(),
226 Some(config.cli.backend.as_str()),
227 )
228 .unwrap_or_else(|e| {
229 warn!(
230 "Failed to build skill registry: {}, using empty registry",
231 e
232 );
233 SkillRegistry::new(Some(config.cli.backend.as_str()))
234 })
235 } else {
236 SkillRegistry::new(Some(config.cli.backend.as_str()))
237 };
238
239 let skill_index = if config.skills.enabled {
240 skill_registry.build_index(None)
241 } else {
242 String::new()
243 };
244
245 let ralph = HatlessRalph::new(
247 config.event_loop.completion_promise.clone(),
248 config.core.clone(),
249 ®istry,
250 config.event_loop.starting_event.clone(),
251 )
252 .with_memories_enabled(config.memories.enabled)
253 .with_skill_index(skill_index);
254
255 let events_path = std::fs::read_to_string(context.current_events_marker())
259 .map(|s| {
260 let relative = s.trim();
261 context.workspace().join(relative)
262 })
263 .unwrap_or_else(|_| context.events_path());
264 let event_reader = EventReader::new(&events_path);
265
266 Self {
267 config,
268 registry,
269 bus,
270 state: LoopState::new(),
271 instruction_builder,
272 ralph,
273 robot_guidance: Vec::new(),
274 event_reader,
275 diagnostics,
276 loop_context: Some(context),
277 skill_registry,
278 robot_service: None,
279 }
280 }
281
282 pub fn with_diagnostics(
284 config: RalphConfig,
285 diagnostics: crate::diagnostics::DiagnosticsCollector,
286 ) -> Self {
287 let registry = HatRegistry::from_config(&config);
288 let instruction_builder =
289 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
290
291 let mut bus = EventBus::new();
292
293 for hat in registry.all() {
297 bus.register(hat.clone());
298 }
299
300 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
304
305 if registry.is_empty() {
306 debug!("Solo mode: Ralph is the only coordinator");
307 } else {
308 debug!(
309 "Multi-hat mode: {} custom hats + Ralph as fallback",
310 registry.len()
311 );
312 }
313
314 let workspace_root = std::path::Path::new(".");
316 let skill_registry = if config.skills.enabled {
317 SkillRegistry::from_config(
318 &config.skills,
319 workspace_root,
320 Some(config.cli.backend.as_str()),
321 )
322 .unwrap_or_else(|e| {
323 warn!(
324 "Failed to build skill registry: {}, using empty registry",
325 e
326 );
327 SkillRegistry::new(Some(config.cli.backend.as_str()))
328 })
329 } else {
330 SkillRegistry::new(Some(config.cli.backend.as_str()))
331 };
332
333 let skill_index = if config.skills.enabled {
334 skill_registry.build_index(None)
335 } else {
336 String::new()
337 };
338
339 let ralph = HatlessRalph::new(
341 config.event_loop.completion_promise.clone(),
342 config.core.clone(),
343 ®istry,
344 config.event_loop.starting_event.clone(),
345 )
346 .with_memories_enabled(config.memories.enabled)
347 .with_skill_index(skill_index);
348
349 let events_path = std::fs::read_to_string(".ralph/current-events")
352 .map(|s| s.trim().to_string())
353 .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
354 let event_reader = EventReader::new(&events_path);
355
356 Self {
357 config,
358 registry,
359 bus,
360 state: LoopState::new(),
361 instruction_builder,
362 ralph,
363 robot_guidance: Vec::new(),
364 event_reader,
365 diagnostics,
366 loop_context: None,
367 skill_registry,
368 robot_service: None,
369 }
370 }
371
372 pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
380 self.robot_service = Some(service);
381 }
382
383 pub fn loop_context(&self) -> Option<&LoopContext> {
385 self.loop_context.as_ref()
386 }
387
388 fn tasks_path(&self) -> PathBuf {
390 self.loop_context
391 .as_ref()
392 .map(|ctx| ctx.tasks_path())
393 .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
394 }
395
396 fn scratchpad_path(&self) -> PathBuf {
398 self.loop_context
399 .as_ref()
400 .map(|ctx| ctx.scratchpad_path())
401 .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
402 }
403
404 pub fn state(&self) -> &LoopState {
406 &self.state
407 }
408
409 pub fn config(&self) -> &RalphConfig {
411 &self.config
412 }
413
414 pub fn registry(&self) -> &HatRegistry {
416 &self.registry
417 }
418
419 pub fn log_hook_run_telemetry(&self, entry: crate::diagnostics::HookRunTelemetryEntry) {
421 self.diagnostics.log_hook_run(entry);
422 }
423
424 pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
429 self.registry
430 .get_config(hat_id)
431 .and_then(|config| config.backend.as_ref())
432 }
433
434 pub fn add_observer<F>(&mut self, observer: F)
439 where
440 F: Fn(&Event) + Send + 'static,
441 {
442 self.bus.add_observer(observer);
443 }
444
445 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
449 pub fn set_observer<F>(&mut self, observer: F)
450 where
451 F: Fn(&Event) + Send + 'static,
452 {
453 #[allow(deprecated)]
454 self.bus.set_observer(observer);
455 }
456
457 pub fn check_termination(&self) -> Option<TerminationReason> {
459 let cfg = &self.config.event_loop;
460
461 if self.state.iteration >= cfg.max_iterations {
462 return Some(TerminationReason::MaxIterations);
463 }
464
465 if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
466 return Some(TerminationReason::MaxRuntime);
467 }
468
469 if let Some(max_cost) = cfg.max_cost_usd
470 && self.state.cumulative_cost >= max_cost
471 {
472 return Some(TerminationReason::MaxCost);
473 }
474
475 if self.state.consecutive_failures >= cfg.max_consecutive_failures {
476 return Some(TerminationReason::ConsecutiveFailures);
477 }
478
479 if self.state.abandoned_task_redispatches >= 3 {
481 return Some(TerminationReason::LoopThrashing);
482 }
483
484 if self.state.consecutive_malformed_events >= 3 {
486 return Some(TerminationReason::ValidationFailure);
487 }
488
489 if self.state.consecutive_same_topic >= 3 {
491 warn!(
492 topic = self.state.last_emitted_topic.as_deref().unwrap_or("?"),
493 count = self.state.consecutive_same_topic,
494 "Stale loop detected: same topic emitted consecutively"
495 );
496 return Some(TerminationReason::LoopStale);
497 }
498
499 let stop_path =
501 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
502 if stop_path.exists() {
503 let _ = std::fs::remove_file(&stop_path);
504 return Some(TerminationReason::Stopped);
505 }
506
507 let restart_path =
509 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
510 if restart_path.exists() {
511 return Some(TerminationReason::RestartRequested);
512 }
513
514 if !std::path::Path::new(&self.config.core.workspace_root).is_dir() {
516 return Some(TerminationReason::WorkspaceGone);
517 }
518
519 None
520 }
521
522 pub fn check_cancellation_event(&mut self) -> Option<TerminationReason> {
527 if !self.state.cancellation_requested {
528 return None;
529 }
530 self.state.cancellation_requested = false;
531 info!("Loop cancelled gracefully via loop.cancel event");
532
533 self.diagnostics.log_orchestration(
534 self.state.iteration,
535 "loop",
536 crate::diagnostics::OrchestrationEvent::LoopTerminated {
537 reason: "cancelled".to_string(),
538 },
539 );
540
541 Some(TerminationReason::Cancelled)
542 }
543
544 pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
548 if !self.state.completion_requested {
549 return None;
550 }
551
552 let required = &self.config.event_loop.required_events;
554 if !required.is_empty() {
555 let missing = self.state.missing_required_events(required);
556 if !missing.is_empty() {
557 warn!(
558 missing = ?missing,
559 "Rejecting LOOP_COMPLETE: required events not seen during loop lifetime"
560 );
561 self.state.completion_requested = false;
562
563 let resume_payload = format!(
565 "LOOP_COMPLETE rejected: missing required events: {:?}. \
566 The agent must complete all workflow phases before emitting LOOP_COMPLETE. \
567 Use loop.cancel to abort the workflow instead.",
568 missing
569 );
570 self.bus.publish(Event::new("task.resume", resume_payload));
571 return None;
572 }
573 }
574
575 self.state.completion_requested = false;
576
577 if self.config.event_loop.persistent {
579 info!("Completion event suppressed - persistent mode active, loop staying alive");
580
581 self.diagnostics.log_orchestration(
582 self.state.iteration,
583 "loop",
584 crate::diagnostics::OrchestrationEvent::LoopTerminated {
585 reason: "completion_event_suppressed_persistent".to_string(),
586 },
587 );
588
589 let resume_event = Event::new(
591 "task.resume",
592 "Persistent mode: loop staying alive after completion signal. \
593 Check for new tasks or await human guidance.",
594 );
595 self.bus.publish(resume_event);
596
597 return None;
598 }
599
600 if self.config.memories.enabled {
602 if let Ok(false) = self.verify_tasks_complete() {
603 let open_tasks = self.get_open_task_list();
604 warn!(
605 open_tasks = ?open_tasks,
606 "Completion event with {} open task(s) - trusting agent decision",
607 open_tasks.len()
608 );
609 }
610 } else if let Ok(false) = self.verify_scratchpad_complete() {
611 warn!("Completion event with pending scratchpad tasks - trusting agent decision");
612 }
613
614 info!("Completion event detected - terminating");
615
616 self.diagnostics.log_orchestration(
618 self.state.iteration,
619 "loop",
620 crate::diagnostics::OrchestrationEvent::LoopTerminated {
621 reason: "completion_event".to_string(),
622 },
623 );
624
625 Some(TerminationReason::CompletionPromise)
626 }
627
628 pub fn initialize(&mut self, prompt_content: &str) {
630 let topic = self
632 .config
633 .event_loop
634 .starting_event
635 .clone()
636 .unwrap_or_else(|| "task.start".to_string());
637 self.initialize_with_topic(&topic, prompt_content);
638 }
639
640 pub fn initialize_resume(&mut self, prompt_content: &str) {
645 self.initialize_with_topic("task.resume", prompt_content);
647 }
648
649 fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
651 self.ralph.set_objective(prompt_content.to_string());
655
656 let start_event = Event::new(topic, prompt_content);
657 self.bus.publish(start_event);
658 debug!(topic = topic, "Published {} event", topic);
659 }
660
661 pub fn next_hat(&self) -> Option<&HatId> {
670 let next = self.bus.next_hat_with_pending();
671
672 if next.is_none() && self.bus.has_human_pending() {
674 return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
675 }
676
677 next.as_ref()?;
679
680 if self.registry.is_empty() {
683 next
685 } else {
686 self.bus.hat_ids().find(|id| id.as_str() == "ralph")
689 }
690 }
691
692 pub fn has_pending_events(&self) -> bool {
697 self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
698 }
699
700 pub fn has_pending_human_events(&self) -> bool {
705 self.bus.has_human_pending()
706 }
707
708 pub fn has_pending_plan_events_in_jsonl(&self) -> std::io::Result<bool> {
713 let result = self.event_reader.peek_new_events()?;
714 Ok(result
715 .events
716 .iter()
717 .any(|event| event.topic.starts_with("plan.")))
718 }
719
720 pub fn pending_human_interact_context_in_jsonl(&self) -> std::io::Result<Option<Value>> {
723 let result = self.event_reader.peek_new_events()?;
724 Ok(result
725 .events
726 .iter()
727 .find(|event| event.topic == "human.interact")
728 .map(|event| {
729 Self::parse_human_interact_context(event.payload.as_deref().unwrap_or_default())
730 }))
731 }
732
733 pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
737 self.registry
738 .get(hat_id)
739 .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
740 .unwrap_or_default()
741 }
742
743 pub fn inject_fallback_event(&mut self) -> bool {
750 let fallback_event = Event::new(
751 "task.resume",
752 "RECOVERY: Previous iteration did not publish an event. \
753 Review the scratchpad and either dispatch the next task or complete the loop.",
754 );
755
756 let fallback_event = match &self.state.last_hat {
759 Some(hat_id) if hat_id.as_str() != "ralph" => {
760 debug!(
761 hat = %hat_id.as_str(),
762 "Injecting fallback event to recover - targeting last hat with task.resume"
763 );
764 fallback_event.with_target(hat_id.clone())
765 }
766 _ => {
767 debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
768 fallback_event
769 }
770 };
771
772 self.bus.publish(fallback_event);
773 true
774 }
775
776 pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
790 if hat_id.as_str() == "ralph" {
793 if self.registry.is_empty() {
794 let mut events = self.bus.take_pending(&hat_id.clone());
796 let mut human_events = self.bus.take_human_pending();
797 events.append(&mut human_events);
798
799 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
801 .into_iter()
802 .partition(|e| e.topic.as_str() == "human.guidance");
803
804 let events_context = regular_events
805 .iter()
806 .map(|e| Self::format_event(e))
807 .collect::<Vec<_>>()
808 .join("\n");
809
810 self.update_robot_guidance(guidance_events);
812 self.apply_robot_guidance();
813
814 let base_prompt = self.ralph.build_prompt(&events_context, &[]);
816 self.ralph.clear_robot_guidance();
817 let with_skills = self.prepend_auto_inject_skills(base_prompt);
818 let with_scratchpad = self.prepend_scratchpad(with_skills);
819 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
820
821 debug!("build_prompt: routing to HatlessRalph (solo mode)");
822 return Some(final_prompt);
823 } else {
824 let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
826 all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
828
829 let mut all_events = Vec::new();
830 let mut system_events = Vec::new();
831
832 for id in &all_hat_ids {
833 let pending = self.bus.take_pending(id);
834 if pending.is_empty() {
835 continue;
836 }
837
838 let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
839 if drop_pending {
840 if let Some(exhausted_event) = exhausted_event {
842 all_events.push(exhausted_event.clone());
843 system_events.push(exhausted_event);
844 }
845 continue;
846 }
847
848 all_events.extend(pending);
849 }
850
851 let mut human_events = self.bus.take_human_pending();
852 all_events.append(&mut human_events);
853
854 for event in system_events {
857 self.bus.publish(event);
858 }
859
860 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
862 .into_iter()
863 .partition(|e| e.topic.as_str() == "human.guidance");
864
865 self.update_robot_guidance(guidance_events);
868 self.apply_robot_guidance();
869
870 let active_hat_ids = self.determine_active_hat_ids(®ular_events);
872 self.record_hat_activations(&active_hat_ids);
873 self.state.last_active_hat_ids = active_hat_ids.clone();
874 let active_hats = self.determine_active_hats(®ular_events);
875
876 let events_context = regular_events
878 .iter()
879 .map(|e| Self::format_event(e))
880 .collect::<Vec<_>>()
881 .join("\n");
882
883 let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
885
886 debug!(
888 "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
889 active_hats
890 .iter()
891 .map(|h| h.id.as_str())
892 .collect::<Vec<_>>()
893 );
894
895 self.ralph.clear_robot_guidance();
897 let with_skills = self.prepend_auto_inject_skills(base_prompt);
898 let with_scratchpad = self.prepend_scratchpad(with_skills);
899 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
900
901 return Some(final_prompt);
902 }
903 }
904
905 let events = self.bus.take_pending(&hat_id.clone());
909 let events_context = events
910 .iter()
911 .map(|e| Self::format_event(e))
912 .collect::<Vec<_>>()
913 .join("\n");
914
915 let hat = self.registry.get(hat_id)?;
916
917 debug!(
919 "build_prompt: hat_id='{}', instructions.is_empty()={}",
920 hat_id.as_str(),
921 hat.instructions.is_empty()
922 );
923
924 debug!(
926 "build_prompt: routing to build_custom_hat() for '{}'",
927 hat_id.as_str()
928 );
929 Some(
930 self.instruction_builder
931 .build_custom_hat(hat, &events_context),
932 )
933 }
934
935 fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
941 if guidance_events.is_empty() {
942 return;
943 }
944
945 self.persist_guidance_to_scratchpad(&guidance_events);
947
948 self.robot_guidance
949 .extend(guidance_events.into_iter().map(|e| e.payload));
950 }
951
952 fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
957 use std::io::Write;
958
959 let scratchpad_path = self.scratchpad_path();
960 let resolved_path = if scratchpad_path.is_relative() {
961 self.config.core.workspace_root.join(&scratchpad_path)
962 } else {
963 scratchpad_path
964 };
965
966 if let Some(parent) = resolved_path.parent()
968 && !parent.exists()
969 && let Err(e) = std::fs::create_dir_all(parent)
970 {
971 warn!("Failed to create scratchpad directory: {}", e);
972 return;
973 }
974
975 let mut file = match std::fs::OpenOptions::new()
976 .create(true)
977 .append(true)
978 .open(&resolved_path)
979 {
980 Ok(f) => f,
981 Err(e) => {
982 warn!("Failed to open scratchpad for guidance persistence: {}", e);
983 return;
984 }
985 };
986
987 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
988 for event in guidance_events {
989 let entry = format!(
990 "\n### HUMAN GUIDANCE ({})\n\n{}\n",
991 timestamp, event.payload
992 );
993 if let Err(e) = file.write_all(entry.as_bytes()) {
994 warn!("Failed to write guidance to scratchpad: {}", e);
995 }
996 }
997
998 info!(
999 count = guidance_events.len(),
1000 "Persisted human guidance to scratchpad"
1001 );
1002 }
1003
1004 fn apply_robot_guidance(&mut self) {
1006 if self.robot_guidance.is_empty() {
1007 return;
1008 }
1009
1010 self.ralph.set_robot_guidance(self.robot_guidance.clone());
1011 }
1012
1013 fn prepend_auto_inject_skills(&self, prompt: String) -> String {
1023 let mut prefix = String::new();
1024
1025 self.inject_memories_and_tools_skill(&mut prefix);
1027
1028 self.inject_robot_skill(&mut prefix);
1030
1031 self.inject_custom_auto_skills(&mut prefix);
1033
1034 if prefix.is_empty() {
1035 return prompt;
1036 }
1037
1038 prefix.push_str("\n\n");
1039 prefix.push_str(&prompt);
1040 prefix
1041 }
1042
1043 fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
1051 let memories_config = &self.config.memories;
1052
1053 if memories_config.enabled && memories_config.inject == InjectMode::Auto {
1055 info!(
1056 "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
1057 memories_config.enabled, memories_config.inject, self.config.core.workspace_root
1058 );
1059
1060 let workspace_root = &self.config.core.workspace_root;
1061 let store = MarkdownMemoryStore::with_default_path(workspace_root);
1062 let memories_path = workspace_root.join(".ralph/agent/memories.md");
1063
1064 info!(
1065 "Looking for memories at: {:?} (exists: {})",
1066 memories_path,
1067 memories_path.exists()
1068 );
1069
1070 let memories = match store.load() {
1071 Ok(memories) => {
1072 info!("Successfully loaded {} memories from store", memories.len());
1073 memories
1074 }
1075 Err(e) => {
1076 info!(
1077 "Failed to load memories for injection: {} (path: {:?})",
1078 e, memories_path
1079 );
1080 Vec::new()
1081 }
1082 };
1083
1084 if memories.is_empty() {
1085 info!("Memory store is empty - no memories to inject");
1086 } else {
1087 let mut memories_content = format_memories_as_markdown(&memories);
1088
1089 if memories_config.budget > 0 {
1090 let original_len = memories_content.len();
1091 memories_content =
1092 truncate_to_budget(&memories_content, memories_config.budget);
1093 debug!(
1094 "Applied budget: {} chars -> {} chars (budget: {})",
1095 original_len,
1096 memories_content.len(),
1097 memories_config.budget
1098 );
1099 }
1100
1101 info!(
1102 "Injecting {} memories ({} chars) into prompt",
1103 memories.len(),
1104 memories_content.len()
1105 );
1106
1107 prefix.push_str(&memories_content);
1108 }
1109 }
1110
1111 if memories_config.enabled || self.config.tasks.enabled {
1113 if let Some(skill) = self.skill_registry.get("ralph-tools") {
1114 if !prefix.is_empty() {
1115 prefix.push_str("\n\n");
1116 }
1117 prefix.push_str(&format!(
1118 "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1119 skill.content.trim()
1120 ));
1121 debug!("Injected ralph-tools skill from registry");
1122 } else {
1123 debug!("ralph-tools skill not found in registry - skill content not injected");
1124 }
1125 }
1126 }
1127
1128 fn inject_robot_skill(&self, prefix: &mut String) {
1133 if !self.config.robot.enabled {
1134 return;
1135 }
1136
1137 if let Some(skill) = self.skill_registry.get("robot-interaction") {
1138 if !prefix.is_empty() {
1139 prefix.push_str("\n\n");
1140 }
1141 prefix.push_str(&format!(
1142 "<robot-skill>\n{}\n</robot-skill>",
1143 skill.content.trim()
1144 ));
1145 debug!("Injected robot interaction skill from registry");
1146 }
1147 }
1148
1149 fn inject_custom_auto_skills(&self, prefix: &mut String) {
1151 for skill in self.skill_registry.auto_inject_skills(None) {
1152 if skill.name == "ralph-tools" || skill.name == "robot-interaction" {
1154 continue;
1155 }
1156
1157 if !prefix.is_empty() {
1158 prefix.push_str("\n\n");
1159 }
1160 prefix.push_str(&format!(
1161 "<{name}-skill>\n{content}\n</{name}-skill>",
1162 name = skill.name,
1163 content = skill.content.trim()
1164 ));
1165 debug!("Injected auto-inject skill: {}", skill.name);
1166 }
1167 }
1168
1169 fn prepend_scratchpad(&self, prompt: String) -> String {
1175 let scratchpad_path = self.scratchpad_path();
1176
1177 let resolved_path = if scratchpad_path.is_relative() {
1178 self.config.core.workspace_root.join(&scratchpad_path)
1179 } else {
1180 scratchpad_path
1181 };
1182
1183 if !resolved_path.exists() {
1184 debug!(
1185 "Scratchpad not found at {:?}, skipping injection",
1186 resolved_path
1187 );
1188 return prompt;
1189 }
1190
1191 let content = match std::fs::read_to_string(&resolved_path) {
1192 Ok(c) => c,
1193 Err(e) => {
1194 info!("Failed to read scratchpad for injection: {}", e);
1195 return prompt;
1196 }
1197 };
1198
1199 if content.trim().is_empty() {
1200 debug!("Scratchpad is empty, skipping injection");
1201 return prompt;
1202 }
1203
1204 let char_budget = 4000 * 4;
1206 let content = if content.len() > char_budget {
1207 let start = content.len() - char_budget;
1209 let start = floor_char_boundary(&content, start);
1211 let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1212 let discarded = &content[..line_start];
1213
1214 let headings: Vec<&str> = discarded
1216 .lines()
1217 .filter(|line| line.starts_with('#'))
1218 .collect();
1219 let summary = if headings.is_empty() {
1220 format!(
1221 "<!-- earlier content truncated ({} chars omitted) -->",
1222 line_start
1223 )
1224 } else {
1225 format!(
1226 "<!-- earlier content truncated ({} chars omitted) -->\n\
1227 <!-- discarded sections: {} -->",
1228 line_start,
1229 headings.join(" | ")
1230 )
1231 };
1232
1233 format!("{}\n\n{}", summary, &content[line_start..])
1234 } else {
1235 content
1236 };
1237
1238 info!("Injecting scratchpad ({} chars) into prompt", content.len());
1239
1240 let mut final_prompt = format!(
1241 "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1242 self.config.core.scratchpad, content
1243 );
1244 final_prompt.push_str(&prompt);
1245 final_prompt
1246 }
1247
1248 fn prepend_ready_tasks(&self, prompt: String) -> String {
1254 if !self.config.tasks.enabled {
1255 return prompt;
1256 }
1257
1258 use crate::task::TaskStatus;
1259 use crate::task_store::TaskStore;
1260
1261 let tasks_path = self.tasks_path();
1262 let resolved_path = if tasks_path.is_relative() {
1263 self.config.core.workspace_root.join(&tasks_path)
1264 } else {
1265 tasks_path
1266 };
1267
1268 if !resolved_path.exists() {
1269 return prompt;
1270 }
1271
1272 let store = match TaskStore::load(&resolved_path) {
1273 Ok(s) => s,
1274 Err(e) => {
1275 info!("Failed to load task store for injection: {}", e);
1276 return prompt;
1277 }
1278 };
1279
1280 let ready = store.ready();
1281 let open = store.open();
1282 let closed_count = store.all().len() - open.len();
1283
1284 if open.is_empty() && closed_count == 0 {
1285 return prompt;
1286 }
1287
1288 let mut section = String::from("<ready-tasks>\n");
1289 if ready.is_empty() && open.is_empty() {
1290 section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1291 } else {
1292 section.push_str(&format!(
1293 "## Tasks: {} ready, {} open, {} closed\n\n",
1294 ready.len(),
1295 open.len(),
1296 closed_count
1297 ));
1298 for task in &ready {
1299 let status_icon = match task.status {
1300 TaskStatus::Open => "[ ]",
1301 TaskStatus::InProgress => "[~]",
1302 _ => "[?]",
1303 };
1304 section.push_str(&format!(
1305 "- {} [P{}] {} ({})\n",
1306 status_icon, task.priority, task.title, task.id
1307 ));
1308 }
1309 let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1311 let blocked: Vec<_> = open
1312 .iter()
1313 .filter(|t| !ready_ids.contains(&t.id.as_str()))
1314 .collect();
1315 if !blocked.is_empty() {
1316 section.push_str("\nBlocked:\n");
1317 for task in blocked {
1318 section.push_str(&format!(
1319 "- [blocked] [P{}] {} ({}) — blocked by: {}\n",
1320 task.priority,
1321 task.title,
1322 task.id,
1323 task.blocked_by.join(", ")
1324 ));
1325 }
1326 }
1327 }
1328 section.push_str("</ready-tasks>\n\n");
1329
1330 info!(
1331 "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1332 ready.len(),
1333 open.len(),
1334 closed_count
1335 );
1336
1337 let mut final_prompt = section;
1338 final_prompt.push_str(&prompt);
1339 final_prompt
1340 }
1341
1342 pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1344 self.ralph.build_prompt(prompt_content, &[])
1345 }
1346
1347 fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1350 let mut active_hats = Vec::new();
1351 for id in self.determine_active_hat_ids(events) {
1352 if let Some(hat) = self.registry.get(&id) {
1353 active_hats.push(hat);
1354 }
1355 }
1356 active_hats
1357 }
1358
1359 fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1360 let mut active_hat_ids = Vec::new();
1361 for event in events {
1362 if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1363 if !active_hat_ids.iter().any(|id| id == &hat.id) {
1365 active_hat_ids.push(hat.id.clone());
1366 }
1367 }
1368 }
1369 active_hat_ids
1370 }
1371
1372 fn format_event(event: &Event) -> String {
1377 let topic = &event.topic;
1378 let payload = &event.payload;
1379
1380 if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1381 format!(
1382 "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1383 topic, payload
1384 )
1385 } else {
1386 format!("Event: {} - {}", topic, payload)
1387 }
1388 }
1389
1390 fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1391 let Some(config) = self.registry.get_config(hat_id) else {
1392 return (false, None);
1393 };
1394 let Some(max) = config.max_activations else {
1395 return (false, None);
1396 };
1397
1398 let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1399 if count < max {
1400 return (false, None);
1401 }
1402
1403 let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1405
1406 if !should_emit {
1407 return (true, None);
1409 }
1410
1411 let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1412 dropped_topics.sort();
1413
1414 let payload = format!(
1415 "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n - {topics}",
1416 hat = hat_id.as_str(),
1417 max = max,
1418 count = count,
1419 topics = dropped_topics.join("\n - ")
1420 );
1421
1422 warn!(
1423 hat = %hat_id.as_str(),
1424 max_activations = max,
1425 activations = count,
1426 "Hat exhausted (max_activations reached)"
1427 );
1428
1429 (
1430 true,
1431 Some(Event::new(
1432 format!("{}.exhausted", hat_id.as_str()),
1433 payload,
1434 )),
1435 )
1436 }
1437
1438 fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1439 for hat_id in active_hat_ids {
1440 *self
1441 .state
1442 .hat_activation_counts
1443 .entry(hat_id.clone())
1444 .or_insert(0) += 1;
1445 }
1446 }
1447
1448 pub fn get_active_hat_id(&self) -> HatId {
1452 for hat_id in self.bus.hat_ids() {
1454 let Some(events) = self.bus.peek_pending(hat_id) else {
1455 continue;
1456 };
1457 let Some(event) = events.first() else {
1458 continue;
1459 };
1460 if let Some(active_hat) = self.registry.get_for_topic(event.topic.as_str()) {
1461 return active_hat.id.clone();
1462 }
1463 }
1464 HatId::new("ralph")
1465 }
1466
1467 pub fn check_default_publishes(&mut self, hat_id: &HatId) {
1477 if let Some(config) = self.registry.get_config(hat_id)
1478 && let Some(default_topic) = &config.default_publishes
1479 {
1480 let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1481
1482 debug!(
1483 hat = %hat_id.as_str(),
1484 topic = %default_topic,
1485 "No events written by hat, injecting default_publishes event"
1486 );
1487
1488 self.state.record_topic(default_topic.as_str());
1489
1490 if default_topic.as_str() == self.config.event_loop.completion_promise {
1494 info!(
1495 hat = %hat_id.as_str(),
1496 topic = %default_topic,
1497 "default_publishes matches completion_promise — requesting termination"
1498 );
1499 self.state.completion_requested = true;
1500 }
1501
1502 self.bus.publish(default_event);
1503 }
1504 }
1505
1506 pub fn bus(&mut self) -> &mut EventBus {
1511 &mut self.bus
1512 }
1513
1514 pub fn process_output(
1518 &mut self,
1519 hat_id: &HatId,
1520 output: &str,
1521 success: bool,
1522 ) -> Option<TerminationReason> {
1523 self.state.iteration += 1;
1524 self.state.last_hat = Some(hat_id.clone());
1525
1526 if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1528 && let Some(ref robot_service) = self.robot_service
1529 {
1530 let elapsed = self.state.elapsed();
1531 let interval = std::time::Duration::from_secs(interval_secs);
1532 let last = self
1533 .state
1534 .last_checkin_at
1535 .map(|t| t.elapsed())
1536 .unwrap_or(elapsed);
1537
1538 if last >= interval {
1539 let context = self.build_checkin_context(hat_id);
1540 match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1541 Ok(_) => {
1542 self.state.last_checkin_at = Some(std::time::Instant::now());
1543 debug!(iteration = self.state.iteration, "Sent robot check-in");
1544 }
1545 Err(e) => {
1546 warn!(error = %e, "Failed to send robot check-in");
1547 }
1548 }
1549 }
1550 }
1551
1552 self.diagnostics.log_orchestration(
1554 self.state.iteration,
1555 "loop",
1556 crate::diagnostics::OrchestrationEvent::IterationStarted,
1557 );
1558
1559 self.diagnostics.log_orchestration(
1561 self.state.iteration,
1562 "loop",
1563 crate::diagnostics::OrchestrationEvent::HatSelected {
1564 hat: hat_id.to_string(),
1565 reason: "process_output".to_string(),
1566 },
1567 );
1568
1569 if success {
1571 self.state.consecutive_failures = 0;
1572 } else {
1573 self.state.consecutive_failures += 1;
1574 }
1575
1576 let _ = output;
1577
1578 self.audit_file_modifications(hat_id);
1581
1582 self.check_termination()
1588 }
1589
1590 fn audit_file_modifications(&mut self, hat_id: &HatId) {
1596 let config = match self.registry.get_config(hat_id) {
1597 Some(c) => c,
1598 None => return,
1599 };
1600
1601 let has_write_restriction = config
1602 .disallowed_tools
1603 .iter()
1604 .any(|t| t == "Edit" || t == "Write");
1605
1606 if !has_write_restriction {
1607 return;
1608 }
1609
1610 let workspace = &self.config.core.workspace_root;
1611 let diff_output = std::process::Command::new("git")
1612 .args(["diff", "--stat", "HEAD"])
1613 .current_dir(workspace)
1614 .output();
1615
1616 match diff_output {
1617 Ok(output) if !output.stdout.is_empty() => {
1618 let diff_stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
1619 warn!(
1620 hat = %hat_id.as_str(),
1621 diff = %diff_stat,
1622 "Hat modified files despite tool restrictions (scope violation)"
1623 );
1624
1625 let violation_topic = format!("{}.scope_violation", hat_id.as_str());
1626 let violation = Event::new(
1627 violation_topic.as_str(),
1628 format!(
1629 "Hat '{}' modified files with Edit/Write disallowed:\n{}",
1630 hat_id.as_str(),
1631 diff_stat
1632 ),
1633 );
1634 self.bus.publish(violation);
1635 }
1636 Err(e) => {
1637 debug!(error = %e, "Could not run git diff for file-modification audit");
1638 }
1639 _ => {} }
1641 }
1642
1643 fn extract_task_id(payload: &str) -> String {
1646 payload
1647 .lines()
1648 .next()
1649 .unwrap_or("unknown")
1650 .trim()
1651 .to_string()
1652 }
1653
1654 pub fn add_cost(&mut self, cost: f64) {
1656 self.state.cumulative_cost += cost;
1657 }
1658
1659 fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1666 let scratchpad_path = self.scratchpad_path();
1667
1668 if !scratchpad_path.exists() {
1669 return Err(std::io::Error::new(
1670 std::io::ErrorKind::NotFound,
1671 "Scratchpad does not exist",
1672 ));
1673 }
1674
1675 let content = std::fs::read_to_string(scratchpad_path)?;
1676
1677 let has_pending = content
1678 .lines()
1679 .any(|line| line.trim_start().starts_with("- [ ]"));
1680
1681 Ok(!has_pending)
1682 }
1683
1684 fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1685 use crate::task_store::TaskStore;
1686
1687 let tasks_path = self.tasks_path();
1688
1689 if !tasks_path.exists() {
1691 return Ok(true);
1692 }
1693
1694 let store = TaskStore::load(&tasks_path)?;
1695 Ok(!store.has_pending_tasks())
1696 }
1697
1698 fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
1700 let (open_tasks, closed_tasks) = self.count_tasks();
1701 CheckinContext {
1702 current_hat: Some(hat_id.as_str().to_string()),
1703 open_tasks,
1704 closed_tasks,
1705 cumulative_cost: self.state.cumulative_cost,
1706 }
1707 }
1708
1709 fn count_tasks(&self) -> (usize, usize) {
1714 use crate::task::TaskStatus;
1715 use crate::task_store::TaskStore;
1716
1717 let tasks_path = self.tasks_path();
1718 if !tasks_path.exists() {
1719 return (0, 0);
1720 }
1721
1722 match TaskStore::load(&tasks_path) {
1723 Ok(store) => {
1724 let total = store.all().len();
1725 let open = store.open().len();
1726 let closed = total - open;
1727 debug_assert_eq!(
1729 closed,
1730 store
1731 .all()
1732 .iter()
1733 .filter(|t| t.status == TaskStatus::Closed)
1734 .count()
1735 );
1736 (open, closed)
1737 }
1738 Err(_) => (0, 0),
1739 }
1740 }
1741
1742 fn get_open_task_list(&self) -> Vec<String> {
1744 use crate::task_store::TaskStore;
1745
1746 let tasks_path = self.tasks_path();
1747 if let Ok(store) = TaskStore::load(&tasks_path) {
1748 return store
1749 .open()
1750 .iter()
1751 .map(|t| format!("{}: {}", t.id, t.title))
1752 .collect();
1753 }
1754 vec![]
1755 }
1756
1757 fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
1758 let threshold = self.config.event_loop.mutation_score_warn_threshold;
1759
1760 match &evidence.mutants {
1761 Some(mutants) => {
1762 if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
1763 warn!(
1764 reason = %reason,
1765 mutants_status = ?mutants.status,
1766 mutants_score = mutants.score_percent,
1767 mutants_threshold = threshold,
1768 "Mutation testing warning"
1769 );
1770 }
1771 }
1772 None => {
1773 if let Some(threshold) = threshold {
1774 warn!(
1775 mutants_threshold = threshold,
1776 "Mutation testing warning: missing mutation evidence in build.done payload"
1777 );
1778 }
1779 }
1780 }
1781 }
1782
1783 fn mutation_warning_reason(
1784 mutants: &MutationEvidence,
1785 threshold: Option<f64>,
1786 ) -> Option<String> {
1787 match mutants.status {
1788 MutationStatus::Fail => Some("mutation testing failed".to_string()),
1789 MutationStatus::Warn => Some(Self::format_mutation_message(
1790 "mutation score below threshold",
1791 mutants.score_percent,
1792 )),
1793 MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
1794 MutationStatus::Pass => {
1795 let threshold = threshold?;
1796
1797 match mutants.score_percent {
1798 Some(score) if score < threshold => Some(format!(
1799 "mutation score {:.2}% below threshold {:.2}%",
1800 score, threshold
1801 )),
1802 Some(_) => None,
1803 None => Some(format!(
1804 "mutation score missing (threshold {:.2}%)",
1805 threshold
1806 )),
1807 }
1808 }
1809 }
1810 }
1811
1812 fn format_mutation_message(message: &str, score: Option<f64>) -> String {
1813 match score {
1814 Some(score) => format!("{message} ({score:.2}%)"),
1815 None => message.to_string(),
1816 }
1817 }
1818
1819 fn parse_human_interact_context(payload: &str) -> Value {
1820 let mut context = match serde_json::from_str::<Value>(payload) {
1821 Ok(Value::Object(map)) => map,
1822 Ok(value) => {
1823 let mut map = Map::new();
1824 map.insert("question".to_string(), value);
1825 map
1826 }
1827 Err(_) => {
1828 let mut map = Map::new();
1829 map.insert("question".to_string(), Value::String(payload.to_string()));
1830 map
1831 }
1832 };
1833
1834 if !context.contains_key("question") {
1835 context.insert("question".to_string(), Value::String(payload.to_string()));
1836 }
1837
1838 Value::Object(context)
1839 }
1840
1841 pub fn process_events_from_jsonl(&mut self) -> std::io::Result<ProcessedEvents> {
1853 let result = self.event_reader.read_new_events()?;
1854
1855 for malformed in &result.malformed {
1857 let payload = format!(
1858 "Line {}: {}\nContent: {}",
1859 malformed.line_number, malformed.error, &malformed.content
1860 );
1861 let event = Event::new("event.malformed", &payload);
1862 self.bus.publish(event);
1863 self.state.consecutive_malformed_events += 1;
1864 warn!(
1865 line = malformed.line_number,
1866 consecutive = self.state.consecutive_malformed_events,
1867 "Malformed event line detected"
1868 );
1869 }
1870
1871 if !result.events.is_empty() {
1873 self.state.consecutive_malformed_events = 0;
1874 }
1875
1876 if result.events.is_empty() && result.malformed.is_empty() {
1877 return Ok(ProcessedEvents {
1878 had_events: false,
1879 had_plan_events: false,
1880 human_interact_context: None,
1881 has_orphans: false,
1882 });
1883 }
1884
1885 let events = if self.config.event_loop.enforce_hat_scope {
1888 let active_hats = self.state.last_active_hat_ids.clone();
1889 let (in_scope, out_of_scope): (Vec<_>, Vec<_>) =
1890 result.events.into_iter().partition(|event| {
1891 if active_hats.is_empty() {
1892 return true; }
1894 active_hats
1895 .iter()
1896 .any(|hat_id| self.registry.can_publish(hat_id, event.topic.as_str()))
1897 });
1898
1899 for event in &out_of_scope {
1900 let violation_hat = active_hats.first().map(|h| h.as_str()).unwrap_or("unknown");
1901 warn!(
1902 active_hats = ?active_hats,
1903 topic = %event.topic,
1904 "Scope violation: active hat(s) cannot publish this topic — dropping event"
1905 );
1906 let violation_topic = format!("{}.scope_violation", violation_hat);
1907 let violation_payload = format!(
1908 "Attempted to publish '{}': {}",
1909 event.topic,
1910 event.payload.clone().unwrap_or_default()
1911 );
1912 let violation = Event::new(violation_topic, violation_payload);
1913 self.bus.publish(violation);
1914 }
1915
1916 in_scope
1917 } else {
1918 result.events
1919 };
1920 let mut has_orphans = false;
1923
1924 let mut validated_events = Vec::new();
1926 let completion_topic = self.config.event_loop.completion_promise.as_str();
1927 let cancellation_topic = self.config.event_loop.cancellation_promise.clone();
1928 let total_events = events.len();
1929 for (index, event) in events.into_iter().enumerate() {
1930 let payload = event.payload.clone().unwrap_or_default();
1931
1932 if !cancellation_topic.is_empty() && event.topic.as_str() == cancellation_topic {
1934 info!(
1935 payload = %payload,
1936 "loop.cancel event detected — scheduling graceful termination"
1937 );
1938 self.state.cancellation_requested = true;
1939 continue;
1941 }
1942
1943 if event.topic == completion_topic {
1944 if index + 1 == total_events {
1945 self.state.completion_requested = true;
1946 self.diagnostics.log_orchestration(
1947 self.state.iteration,
1948 "jsonl",
1949 crate::diagnostics::OrchestrationEvent::EventPublished {
1950 topic: event.topic.clone(),
1951 },
1952 );
1953 info!(
1954 topic = %event.topic,
1955 "Completion event detected in JSONL"
1956 );
1957 } else {
1958 warn!(
1959 topic = %event.topic,
1960 index = index,
1961 total_events = total_events,
1962 "Completion event ignored because it was not the last event"
1963 );
1964 }
1965 continue;
1966 }
1967
1968 if event.topic == "build.done" {
1969 if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
1971 if evidence.all_passed() {
1972 self.warn_on_mutation_evidence(&evidence);
1973 validated_events.push(Event::new(event.topic.as_str(), &payload));
1974 } else {
1975 warn!(
1977 tests = evidence.tests_passed,
1978 lint = evidence.lint_passed,
1979 typecheck = evidence.typecheck_passed,
1980 audit = evidence.audit_passed,
1981 coverage = evidence.coverage_passed,
1982 complexity = evidence.complexity_score,
1983 duplication = evidence.duplication_passed,
1984 performance = evidence.performance_regression,
1985 specs = evidence.specs_verified,
1986 "build.done rejected: backpressure checks failed"
1987 );
1988
1989 let complexity = evidence
1990 .complexity_score
1991 .map(|value| format!("{value:.2}"))
1992 .unwrap_or_else(|| "missing".to_string());
1993 let performance = match evidence.performance_regression {
1994 Some(true) => "regression".to_string(),
1995 Some(false) => "pass".to_string(),
1996 None => "missing".to_string(),
1997 };
1998 let specs = match evidence.specs_verified {
1999 Some(true) => "pass".to_string(),
2000 Some(false) => "fail".to_string(),
2001 None => "not reported".to_string(),
2002 };
2003
2004 self.diagnostics.log_orchestration(
2005 self.state.iteration,
2006 "jsonl",
2007 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2008 reason: format!(
2009 "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
2010 evidence.tests_passed,
2011 evidence.lint_passed,
2012 evidence.typecheck_passed,
2013 evidence.audit_passed,
2014 evidence.coverage_passed,
2015 complexity,
2016 evidence.duplication_passed,
2017 performance,
2018 specs
2019 ),
2020 },
2021 );
2022
2023 validated_events.push(Event::new(
2024 "build.blocked",
2025 "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
2026 ));
2027 }
2028 } else {
2029 warn!("build.done rejected: missing backpressure evidence");
2031
2032 self.diagnostics.log_orchestration(
2033 self.state.iteration,
2034 "jsonl",
2035 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2036 reason: "missing backpressure evidence".to_string(),
2037 },
2038 );
2039
2040 validated_events.push(Event::new(
2041 "build.blocked",
2042 "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
2043 ));
2044 }
2045 } else if event.topic == "review.done" {
2046 if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
2048 if evidence.is_verified() {
2049 validated_events.push(Event::new(event.topic.as_str(), &payload));
2050 } else {
2051 warn!(
2053 tests = evidence.tests_passed,
2054 build = evidence.build_passed,
2055 "review.done rejected: verification checks failed"
2056 );
2057
2058 self.diagnostics.log_orchestration(
2059 self.state.iteration,
2060 "jsonl",
2061 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2062 reason: format!(
2063 "review verification failed: tests={}, build={}",
2064 evidence.tests_passed, evidence.build_passed
2065 ),
2066 },
2067 );
2068
2069 validated_events.push(Event::new(
2070 "review.blocked",
2071 "Review verification failed. Run tests and build before emitting review.done.",
2072 ));
2073 }
2074 } else {
2075 warn!("review.done rejected: missing verification evidence");
2077
2078 self.diagnostics.log_orchestration(
2079 self.state.iteration,
2080 "jsonl",
2081 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2082 reason: "missing review verification evidence".to_string(),
2083 },
2084 );
2085
2086 validated_events.push(Event::new(
2087 "review.blocked",
2088 "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
2089 ));
2090 }
2091 } else if event.topic == "verify.passed" {
2092 if let Some(report) = EventParser::parse_quality_report(&payload) {
2093 if report.meets_thresholds() {
2094 validated_events.push(Event::new(event.topic.as_str(), &payload));
2095 } else {
2096 let failed = report.failed_dimensions();
2097 let reason = if failed.is_empty() {
2098 "quality thresholds failed".to_string()
2099 } else {
2100 format!("quality thresholds failed: {}", failed.join(", "))
2101 };
2102
2103 warn!(
2104 failed_dimensions = ?failed,
2105 "verify.passed rejected: quality thresholds failed"
2106 );
2107
2108 self.diagnostics.log_orchestration(
2109 self.state.iteration,
2110 "jsonl",
2111 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2112 reason,
2113 },
2114 );
2115
2116 validated_events.push(Event::new(
2117 "verify.failed",
2118 "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
2119 ));
2120 }
2121 } else {
2122 warn!("verify.passed rejected: missing quality report");
2124
2125 self.diagnostics.log_orchestration(
2126 self.state.iteration,
2127 "jsonl",
2128 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2129 reason: "missing quality report".to_string(),
2130 },
2131 );
2132
2133 validated_events.push(Event::new(
2134 "verify.failed",
2135 "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
2136 ));
2137 }
2138 } else if event.topic == "verify.failed" {
2139 if EventParser::parse_quality_report(&payload).is_none() {
2140 warn!("verify.failed missing quality report");
2141 }
2142 validated_events.push(Event::new(event.topic.as_str(), &payload));
2143 } else {
2144 validated_events.push(Event::new(event.topic.as_str(), &payload));
2146 }
2147 }
2148
2149 let blocked_events: Vec<_> = validated_events
2151 .iter()
2152 .filter(|e| e.topic == "build.blocked".into())
2153 .collect();
2154
2155 for blocked_event in &blocked_events {
2156 let task_id = Self::extract_task_id(&blocked_event.payload);
2157
2158 let count = self
2159 .state
2160 .task_block_counts
2161 .entry(task_id.clone())
2162 .or_insert(0);
2163 *count += 1;
2164
2165 debug!(
2166 task_id = %task_id,
2167 block_count = *count,
2168 "Task blocked"
2169 );
2170
2171 if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
2173 warn!(
2174 task_id = %task_id,
2175 "Task abandoned after 3 consecutive blocks"
2176 );
2177
2178 self.state.abandoned_tasks.push(task_id.clone());
2179
2180 self.diagnostics.log_orchestration(
2181 self.state.iteration,
2182 "jsonl",
2183 crate::diagnostics::OrchestrationEvent::TaskAbandoned {
2184 reason: format!(
2185 "3 consecutive build.blocked events for task '{}'",
2186 task_id
2187 ),
2188 },
2189 );
2190
2191 let abandoned_event = Event::new(
2192 "build.task.abandoned",
2193 format!(
2194 "Task '{}' abandoned after 3 consecutive build.blocked events",
2195 task_id
2196 ),
2197 );
2198
2199 self.bus.publish(abandoned_event);
2200 }
2201 }
2202
2203 let has_blocked_event = !blocked_events.is_empty();
2205
2206 if has_blocked_event {
2207 self.state.consecutive_blocked += 1;
2208 } else {
2209 self.state.consecutive_blocked = 0;
2210 self.state.last_blocked_hat = None;
2211 }
2212
2213 let mut response_event = None;
2217 let mut human_interact_context = None;
2218 let ask_human_idx = validated_events
2219 .iter()
2220 .position(|e| e.topic == "human.interact".into());
2221
2222 if let Some(idx) = ask_human_idx {
2223 let ask_event = &validated_events[idx];
2224 let payload = ask_event.payload.clone();
2225
2226 let mut context = match Self::parse_human_interact_context(&payload) {
2227 Value::Object(map) => map,
2228 _ => Map::new(),
2229 };
2230
2231 if let Some(ref robot_service) = self.robot_service {
2232 info!(
2233 payload = %payload,
2234 "human.interact event detected — sending question via robot service"
2235 );
2236
2237 let send_ok = match robot_service.send_question(&payload) {
2239 Ok(_message_id) => true,
2240 Err(e) => {
2241 warn!(
2242 error = %e,
2243 "Failed to send human.interact question after retries — treating as timeout"
2244 );
2245 self.diagnostics.log_error(
2247 self.state.iteration,
2248 "telegram",
2249 crate::diagnostics::DiagnosticError::TelegramSendError {
2250 operation: "send_question".to_string(),
2251 error: e.to_string(),
2252 retry_count: 3,
2253 },
2254 );
2255 context.insert(
2256 "outcome".to_string(),
2257 Value::String("send_failure".to_string()),
2258 );
2259 context.insert("error".to_string(), Value::String(e.to_string()));
2260 false
2261 }
2262 };
2263
2264 if send_ok {
2267 let events_path = self
2270 .loop_context
2271 .as_ref()
2272 .and_then(|ctx| {
2273 std::fs::read_to_string(ctx.current_events_marker())
2274 .ok()
2275 .map(|s| ctx.workspace().join(s.trim()))
2276 })
2277 .or_else(|| {
2278 std::fs::read_to_string(".ralph/current-events")
2279 .ok()
2280 .map(|s| PathBuf::from(s.trim()))
2281 })
2282 .unwrap_or_else(|| {
2283 self.loop_context
2284 .as_ref()
2285 .map(|ctx| ctx.events_path())
2286 .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2287 });
2288
2289 match robot_service.wait_for_response(&events_path) {
2290 Ok(Some(response)) => {
2291 info!(
2292 response = %response,
2293 "Received human.response — continuing loop"
2294 );
2295 context.insert(
2296 "outcome".to_string(),
2297 Value::String("response".to_string()),
2298 );
2299 context.insert("response".to_string(), Value::String(response.clone()));
2300 response_event = Some(Event::new("human.response", &response));
2302 }
2303 Ok(None) => {
2304 warn!(
2305 timeout_secs = robot_service.timeout_secs(),
2306 "Human response timeout — injecting human.timeout event"
2307 );
2308 context.insert(
2309 "outcome".to_string(),
2310 Value::String("timeout".to_string()),
2311 );
2312 context.insert(
2313 "timeout_seconds".to_string(),
2314 Value::from(robot_service.timeout_secs()),
2315 );
2316 let timeout_event = Event::new(
2317 "human.timeout",
2318 format!(
2319 "No response after {}s. Original question: {}",
2320 robot_service.timeout_secs(),
2321 payload
2322 ),
2323 );
2324 response_event = Some(timeout_event);
2325 }
2326 Err(e) => {
2327 warn!(
2328 error = %e,
2329 "Error waiting for human response — injecting human.timeout event"
2330 );
2331 context.insert(
2332 "outcome".to_string(),
2333 Value::String("wait_error".to_string()),
2334 );
2335 context.insert("error".to_string(), Value::String(e.to_string()));
2336 let timeout_event = Event::new(
2337 "human.timeout",
2338 format!(
2339 "Error waiting for response: {}. Original question: {}",
2340 e, payload
2341 ),
2342 );
2343 response_event = Some(timeout_event);
2344 }
2345 }
2346 }
2347 } else {
2348 debug!(
2349 "human.interact event detected but no robot service active — passing through"
2350 );
2351 context.insert(
2352 "outcome".to_string(),
2353 Value::String("no_robot_service".to_string()),
2354 );
2355 }
2356
2357 human_interact_context = Some(Value::Object(context));
2358 }
2359
2360 let had_events = !validated_events.is_empty();
2362 let had_plan_events = validated_events
2363 .iter()
2364 .any(|event| event.topic.as_str().starts_with("plan."));
2365
2366 for event in validated_events {
2371 self.state.record_topic(event.topic.as_str());
2373
2374 self.diagnostics.log_orchestration(
2375 self.state.iteration,
2376 "jsonl",
2377 crate::diagnostics::OrchestrationEvent::EventPublished {
2378 topic: event.topic.to_string(),
2379 },
2380 );
2381
2382 if !self.registry.has_subscriber(event.topic.as_str()) {
2383 has_orphans = true;
2384 }
2385
2386 debug!(
2387 topic = %event.topic,
2388 "Publishing event from JSONL"
2389 );
2390 self.bus.publish(event);
2391 }
2392
2393 if let Some(response) = response_event {
2395 self.state.record_topic(response.topic.as_str());
2396 info!(
2397 topic = %response.topic,
2398 "Publishing human.response event from robot service"
2399 );
2400 self.bus.publish(response);
2401 }
2402
2403 Ok(ProcessedEvents {
2404 had_events,
2405 had_plan_events,
2406 human_interact_context,
2407 has_orphans,
2408 })
2409 }
2410
2411 pub fn check_ralph_completion(&self, output: &str) -> bool {
2415 let events = EventParser::new().parse(output);
2416 events
2417 .iter()
2418 .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2419 }
2420
2421 pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2428 self.stop_robot_service();
2430
2431 let elapsed = self.state.elapsed();
2432 let duration_str = format_duration(elapsed);
2433
2434 let payload = format!(
2435 "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2436 reason.as_str(),
2437 termination_status_text(reason),
2438 self.state.iteration,
2439 duration_str,
2440 reason.exit_code()
2441 );
2442
2443 let event = Event::new("loop.terminate", &payload);
2444
2445 self.bus.publish(event.clone());
2447
2448 info!(
2449 reason = %reason.as_str(),
2450 iterations = self.state.iteration,
2451 duration = %duration_str,
2452 "Wrapping up: {}. {} iterations in {}.",
2453 reason.as_str(),
2454 self.state.iteration,
2455 duration_str
2456 );
2457
2458 event
2459 }
2460
2461 pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2466 self.robot_service.as_ref().map(|s| s.shutdown_flag())
2467 }
2468
2469 fn stop_robot_service(&mut self) {
2473 if let Some(service) = self.robot_service.take() {
2474 service.stop();
2475 }
2476 }
2477
2478 pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2486 events
2487 .iter()
2488 .find(|e| e.topic.as_str() == "user.prompt")
2489 .map(|e| UserPrompt {
2490 id: Self::extract_prompt_id(&e.payload),
2491 text: e.payload.clone(),
2492 })
2493 }
2494
2495 fn extract_prompt_id(payload: &str) -> String {
2500 if let Some(start) = payload.find("id=\"")
2502 && let Some(end) = payload[start + 4..].find('"')
2503 {
2504 return payload[start + 4..start + 4 + end].to_string();
2505 }
2506
2507 format!("q{}", Self::generate_prompt_id())
2509 }
2510
2511 fn generate_prompt_id() -> String {
2514 use std::time::{SystemTime, UNIX_EPOCH};
2515 let nanos = SystemTime::now()
2516 .duration_since(UNIX_EPOCH)
2517 .unwrap()
2518 .as_nanos();
2519 format!("{:x}", nanos % 0xFFFF_FFFF)
2520 }
2521}
2522
2523#[derive(Debug, Clone)]
2527pub struct UserPrompt {
2528 pub id: String,
2530 pub text: String,
2532}
2533
2534fn format_duration(d: Duration) -> String {
2536 let total_secs = d.as_secs();
2537 let hours = total_secs / 3600;
2538 let minutes = (total_secs % 3600) / 60;
2539 let seconds = total_secs % 60;
2540
2541 if hours > 0 {
2542 format!("{}h {}m {}s", hours, minutes, seconds)
2543 } else if minutes > 0 {
2544 format!("{}m {}s", minutes, seconds)
2545 } else {
2546 format!("{}s", seconds)
2547 }
2548}
2549
2550fn termination_status_text(reason: &TerminationReason) -> &'static str {
2552 match reason {
2553 TerminationReason::CompletionPromise => "All tasks completed successfully.",
2554 TerminationReason::MaxIterations => "Stopped at iteration limit.",
2555 TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2556 TerminationReason::MaxCost => "Stopped at cost limit.",
2557 TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2558 TerminationReason::LoopThrashing => {
2559 "Loop thrashing detected - same hat repeatedly blocked."
2560 }
2561 TerminationReason::LoopStale => {
2562 "Stale loop detected - same topic emitted 3+ times consecutively."
2563 }
2564 TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2565 TerminationReason::Stopped => "Manually stopped.",
2566 TerminationReason::Interrupted => "Interrupted by signal.",
2567 TerminationReason::RestartRequested => "Restarting by human request.",
2568 TerminationReason::WorkspaceGone => "Workspace directory removed externally.",
2569 TerminationReason::Cancelled => "Cancelled gracefully (human rejection or timeout).",
2570 }
2571}