1mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig, ScratchpadConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use crate::text::floor_char_boundary;
21use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
22use serde_json::{Map, Value};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use std::time::Duration;
27use tracing::{debug, info, warn};
28
29#[derive(Debug, Clone)]
31pub struct ProcessedEvents {
32 pub had_events: bool,
34 pub had_plan_events: bool,
36 pub human_interact_context: Option<Value>,
39 pub has_orphans: bool,
41}
42
43#[derive(Debug)]
45pub struct ProcessedEventsWithWaves {
46 pub processed: ProcessedEvents,
48 pub wave_events: Vec<crate::event_reader::Event>,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum TerminationReason {
55 CompletionPromise,
57 MaxIterations,
59 MaxRuntime,
61 MaxCost,
63 ConsecutiveFailures,
65 LoopThrashing,
67 LoopStale,
69 ValidationFailure,
71 Stopped,
73 Interrupted,
75 RestartRequested,
77 WorkspaceGone,
79 Cancelled,
81}
82
83impl TerminationReason {
84 pub fn exit_code(&self) -> i32 {
92 match self {
93 TerminationReason::CompletionPromise => 0,
94 TerminationReason::ConsecutiveFailures
95 | TerminationReason::LoopThrashing
96 | TerminationReason::LoopStale
97 | TerminationReason::ValidationFailure
98 | TerminationReason::Stopped
99 | TerminationReason::WorkspaceGone => 1,
100 TerminationReason::MaxIterations
101 | TerminationReason::MaxRuntime
102 | TerminationReason::MaxCost => 2,
103 TerminationReason::Interrupted => 130,
104 TerminationReason::RestartRequested => 3,
106 TerminationReason::Cancelled => 0,
108 }
109 }
110
111 pub fn as_str(&self) -> &'static str {
116 match self {
117 TerminationReason::CompletionPromise => "completed",
118 TerminationReason::MaxIterations => "max_iterations",
119 TerminationReason::MaxRuntime => "max_runtime",
120 TerminationReason::MaxCost => "max_cost",
121 TerminationReason::ConsecutiveFailures => "consecutive_failures",
122 TerminationReason::LoopThrashing => "loop_thrashing",
123 TerminationReason::LoopStale => "loop_stale",
124 TerminationReason::ValidationFailure => "validation_failure",
125 TerminationReason::Stopped => "stopped",
126 TerminationReason::Interrupted => "interrupted",
127 TerminationReason::RestartRequested => "restart_requested",
128 TerminationReason::WorkspaceGone => "workspace_gone",
129 TerminationReason::Cancelled => "cancelled",
130 }
131 }
132
133 pub fn is_success(&self) -> bool {
135 matches!(self, TerminationReason::CompletionPromise)
136 }
137}
138
139pub struct EventLoop {
141 config: RalphConfig,
142 registry: HatRegistry,
143 bus: EventBus,
144 state: LoopState,
145 instruction_builder: InstructionBuilder,
146 ralph: HatlessRalph,
147 robot_guidance: Vec<String>,
149 pub(crate) event_reader: EventReader,
152 diagnostics: crate::diagnostics::DiagnosticsCollector,
153 loop_context: Option<LoopContext>,
155 skill_registry: SkillRegistry,
157 robot_service: Option<Box<dyn RobotService>>,
160}
161
162impl EventLoop {
163 pub fn new(config: RalphConfig) -> Self {
165 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
168 .unwrap_or_else(|e| {
169 debug!(
170 "Failed to initialize diagnostics: {}, using disabled collector",
171 e
172 );
173 crate::diagnostics::DiagnosticsCollector::disabled()
174 });
175
176 Self::with_diagnostics(config, diagnostics)
177 }
178
179 pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
185 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
186 .unwrap_or_else(|e| {
187 debug!(
188 "Failed to initialize diagnostics: {}, using disabled collector",
189 e
190 );
191 crate::diagnostics::DiagnosticsCollector::disabled()
192 });
193
194 Self::with_context_and_diagnostics(config, context, diagnostics)
195 }
196
197 pub fn with_context_and_diagnostics(
199 mut config: RalphConfig,
200 context: LoopContext,
201 diagnostics: crate::diagnostics::DiagnosticsCollector,
202 ) -> Self {
203 if config.hats.is_empty() && !config.core.scratchpad.enabled {
205 warn!(
206 "core.scratchpad.enabled is false but no hats are defined. \
207 Scratchpad is the only continuity mechanism in solo mode — forcing enabled."
208 );
209 config.core.scratchpad.enabled = true;
210 }
211
212 let registry = HatRegistry::from_config(&config);
213 let instruction_builder =
214 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
215
216 let mut bus = EventBus::new();
217
218 for hat in registry.all() {
222 bus.register(hat.clone());
223 }
224
225 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
229
230 if registry.is_empty() {
231 debug!("Solo mode: Ralph is the only coordinator");
232 } else {
233 debug!(
234 "Multi-hat mode: {} custom hats + Ralph as fallback",
235 registry.len()
236 );
237 }
238
239 let mut skill_registry = if config.skills.enabled {
241 SkillRegistry::from_config(
242 &config.skills,
243 context.workspace(),
244 Some(config.cli.backend.as_str()),
245 )
246 .unwrap_or_else(|e| {
247 warn!(
248 "Failed to build skill registry: {}, using empty registry",
249 e
250 );
251 SkillRegistry::new(Some(config.cli.backend.as_str()))
252 })
253 } else {
254 SkillRegistry::new(Some(config.cli.backend.as_str()))
255 };
256
257 if !config.tasks.enabled {
259 skill_registry.remove("ralph-tools-tasks");
260 }
261 if !config.memories.enabled {
262 skill_registry.remove("ralph-tools-memories");
263 }
264
265 let skill_index = if config.skills.enabled {
266 skill_registry.build_index(None)
267 } else {
268 String::new()
269 };
270
271 let ralph = HatlessRalph::new(
273 config.event_loop.completion_promise.clone(),
274 config.core.clone(),
275 ®istry,
276 config.event_loop.starting_event.clone(),
277 )
278 .with_memories_enabled(config.memories.enabled)
279 .with_skill_index(skill_index);
280
281 let events_path = std::fs::read_to_string(context.current_events_marker())
285 .map(|s| {
286 let relative = s.trim();
287 context.workspace().join(relative)
288 })
289 .unwrap_or_else(|_| context.events_path());
290 let event_reader = EventReader::new(&events_path);
291
292 Self {
293 config,
294 registry,
295 bus,
296 state: LoopState::new(),
297 instruction_builder,
298 ralph,
299 robot_guidance: Vec::new(),
300 event_reader,
301 diagnostics,
302 loop_context: Some(context),
303 skill_registry,
304 robot_service: None,
305 }
306 }
307
308 pub fn with_diagnostics(
310 mut config: RalphConfig,
311 diagnostics: crate::diagnostics::DiagnosticsCollector,
312 ) -> Self {
313 if config.hats.is_empty() && !config.core.scratchpad.enabled {
315 warn!(
316 "core.scratchpad.enabled is false but no hats are defined. \
317 Scratchpad is the only continuity mechanism in solo mode — forcing enabled."
318 );
319 config.core.scratchpad.enabled = true;
320 }
321
322 let registry = HatRegistry::from_config(&config);
323 let instruction_builder =
324 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
325
326 let mut bus = EventBus::new();
327
328 for hat in registry.all() {
332 bus.register(hat.clone());
333 }
334
335 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
339
340 if registry.is_empty() {
341 debug!("Solo mode: Ralph is the only coordinator");
342 } else {
343 debug!(
344 "Multi-hat mode: {} custom hats + Ralph as fallback",
345 registry.len()
346 );
347 }
348
349 let workspace_root = std::path::Path::new(".");
351 let mut skill_registry = if config.skills.enabled {
352 SkillRegistry::from_config(
353 &config.skills,
354 workspace_root,
355 Some(config.cli.backend.as_str()),
356 )
357 .unwrap_or_else(|e| {
358 warn!(
359 "Failed to build skill registry: {}, using empty registry",
360 e
361 );
362 SkillRegistry::new(Some(config.cli.backend.as_str()))
363 })
364 } else {
365 SkillRegistry::new(Some(config.cli.backend.as_str()))
366 };
367
368 if !config.tasks.enabled {
370 skill_registry.remove("ralph-tools-tasks");
371 }
372 if !config.memories.enabled {
373 skill_registry.remove("ralph-tools-memories");
374 }
375
376 let skill_index = if config.skills.enabled {
377 skill_registry.build_index(None)
378 } else {
379 String::new()
380 };
381
382 let ralph = HatlessRalph::new(
384 config.event_loop.completion_promise.clone(),
385 config.core.clone(),
386 ®istry,
387 config.event_loop.starting_event.clone(),
388 )
389 .with_memories_enabled(config.memories.enabled)
390 .with_skill_index(skill_index);
391
392 let events_path = std::fs::read_to_string(".ralph/current-events")
395 .map(|s| s.trim().to_string())
396 .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
397 let event_reader = EventReader::new(&events_path);
398
399 Self {
400 config,
401 registry,
402 bus,
403 state: LoopState::new(),
404 instruction_builder,
405 ralph,
406 robot_guidance: Vec::new(),
407 event_reader,
408 diagnostics,
409 loop_context: None,
410 skill_registry,
411 robot_service: None,
412 }
413 }
414
415 pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
423 self.robot_service = Some(service);
424 }
425
426 pub fn loop_context(&self) -> Option<&LoopContext> {
428 self.loop_context.as_ref()
429 }
430
431 fn tasks_path(&self) -> PathBuf {
433 self.loop_context
434 .as_ref()
435 .map(|ctx| ctx.tasks_path())
436 .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
437 }
438
439 fn scratchpad_path(&self) -> PathBuf {
446 let active_path = &self.ralph.active_scratchpad().path;
447
448 match self.loop_context.as_ref() {
449 Some(ctx) => ctx.workspace().join(active_path),
450 None => PathBuf::from(active_path),
451 }
452 }
453
454 fn global_scratchpad_path(&self) -> PathBuf {
457 self.loop_context
458 .as_ref()
459 .map(|ctx| ctx.scratchpad_path())
460 .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad.path))
461 }
462
463 pub fn state(&self) -> &LoopState {
465 &self.state
466 }
467
468 pub fn reset_stale_topic_counter(&mut self) {
474 self.state.consecutive_same_signature = 0;
475 self.state.last_emitted_signature = None;
476 }
477
478 pub fn config(&self) -> &RalphConfig {
480 &self.config
481 }
482
483 pub fn registry(&self) -> &HatRegistry {
485 &self.registry
486 }
487
488 pub fn log_hook_run_telemetry(&self, entry: crate::diagnostics::HookRunTelemetryEntry) {
490 self.diagnostics.log_hook_run(entry);
491 }
492
493 pub fn log_prompt(&self, iteration: u32, hat: &str, prompt: &str) {
495 self.diagnostics.log_prompt(iteration, hat, prompt);
496 }
497
498 pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
503 self.registry
504 .get_config(hat_id)
505 .and_then(|config| config.backend.as_ref())
506 }
507
508 pub fn add_observer<F>(&mut self, observer: F)
513 where
514 F: Fn(&Event) + Send + 'static,
515 {
516 self.bus.add_observer(observer);
517 }
518
519 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
523 pub fn set_observer<F>(&mut self, observer: F)
524 where
525 F: Fn(&Event) + Send + 'static,
526 {
527 #[allow(deprecated)]
528 self.bus.set_observer(observer);
529 }
530
531 pub fn check_termination(&self) -> Option<TerminationReason> {
533 let cfg = &self.config.event_loop;
534
535 if self.state.iteration >= cfg.max_iterations {
536 return Some(TerminationReason::MaxIterations);
537 }
538
539 if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
540 return Some(TerminationReason::MaxRuntime);
541 }
542
543 if let Some(max_cost) = cfg.max_cost_usd
544 && self.state.cumulative_cost >= max_cost
545 {
546 return Some(TerminationReason::MaxCost);
547 }
548
549 if self.state.consecutive_failures >= cfg.max_consecutive_failures {
550 return Some(TerminationReason::ConsecutiveFailures);
551 }
552
553 if self.state.abandoned_task_redispatches >= 3 {
555 return Some(TerminationReason::LoopThrashing);
556 }
557
558 if self.state.consecutive_malformed_events >= 3 {
560 return Some(TerminationReason::ValidationFailure);
561 }
562
563 if self.state.consecutive_same_signature >= 3 {
565 let topic = self
566 .state
567 .last_emitted_signature
568 .as_ref()
569 .map(|signature| signature.topic.as_str())
570 .unwrap_or("?");
571 warn!(
572 topic,
573 count = self.state.consecutive_same_signature,
574 "Stale loop detected: same event signature emitted consecutively"
575 );
576 return Some(TerminationReason::LoopStale);
577 }
578
579 let stop_path =
581 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
582 if stop_path.exists() {
583 let _ = std::fs::remove_file(&stop_path);
584 return Some(TerminationReason::Stopped);
585 }
586
587 let restart_path =
589 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
590 if restart_path.exists() {
591 return Some(TerminationReason::RestartRequested);
592 }
593
594 if !std::path::Path::new(&self.config.core.workspace_root).is_dir() {
596 return Some(TerminationReason::WorkspaceGone);
597 }
598
599 None
600 }
601
602 pub fn check_cancellation_event(&mut self) -> Option<TerminationReason> {
607 if !self.state.cancellation_requested {
608 return None;
609 }
610 self.state.cancellation_requested = false;
611 info!("Loop cancelled gracefully via loop.cancel event");
612
613 self.diagnostics.log_orchestration(
614 self.state.iteration,
615 "loop",
616 crate::diagnostics::OrchestrationEvent::LoopTerminated {
617 reason: "cancelled".to_string(),
618 },
619 );
620
621 Some(TerminationReason::Cancelled)
622 }
623
624 pub fn request_completion_from_text_fallback(&mut self) {
631 self.state.completion_requested = true;
632 info!("Completion requested via text fallback (output contained completion promise)");
633 }
634
635 pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
640 if !self.state.completion_requested {
641 return None;
642 }
643
644 let required = &self.config.event_loop.required_events;
646 if !required.is_empty() {
647 let missing = self.state.missing_required_events(required);
648 if !missing.is_empty() {
649 warn!(
650 missing = ?missing,
651 "Rejecting LOOP_COMPLETE: required events not seen during loop lifetime"
652 );
653 self.state.completion_requested = false;
654
655 let resume_payload = format!(
657 "LOOP_COMPLETE rejected: missing required events: {:?}. \
658 The agent must complete all workflow phases before emitting LOOP_COMPLETE. \
659 Use loop.cancel to abort the workflow instead.",
660 missing
661 );
662 self.bus.publish(Event::new("task.resume", resume_payload));
663 return None;
664 }
665 }
666
667 self.state.completion_requested = false;
668
669 if self.config.event_loop.persistent {
671 info!("Completion event suppressed - persistent mode active, loop staying alive");
672
673 self.diagnostics.log_orchestration(
674 self.state.iteration,
675 "loop",
676 crate::diagnostics::OrchestrationEvent::LoopTerminated {
677 reason: "completion_event_suppressed_persistent".to_string(),
678 },
679 );
680
681 let resume_event = Event::new(
683 "task.resume",
684 "Persistent mode: loop staying alive after completion signal. \
685 Check for new tasks or await human guidance.",
686 );
687 self.bus.publish(resume_event);
688
689 return None;
690 }
691
692 if self.config.memories.enabled {
694 if let Ok(false) = self.verify_tasks_complete() {
695 let open_tasks = self.get_open_task_list();
696 warn!(
697 open_tasks = ?open_tasks,
698 "Rejecting completion event with {} open task(s)",
699 open_tasks.len()
700 );
701 self.bus.publish(Event::new(
702 "task.resume",
703 format!(
704 "Completion rejected: runtime tasks remain open: {:?}. Close, fail, or reopen outstanding tasks before emitting the completion promise.",
705 open_tasks
706 ),
707 ));
708 return None;
709 }
710 } else if let Ok(false) = self.verify_scratchpad_complete() {
711 warn!("Completion event with pending scratchpad tasks - trusting agent decision");
712 }
713
714 info!("Completion event detected - terminating");
715
716 self.diagnostics.log_orchestration(
718 self.state.iteration,
719 "loop",
720 crate::diagnostics::OrchestrationEvent::LoopTerminated {
721 reason: "completion_event".to_string(),
722 },
723 );
724
725 Some(TerminationReason::CompletionPromise)
726 }
727
728 pub fn initialize(&mut self, prompt_content: &str) {
730 let topic = self
732 .config
733 .event_loop
734 .starting_event
735 .clone()
736 .unwrap_or_else(|| "task.start".to_string());
737 self.initialize_with_topic(&topic, prompt_content);
738 }
739
740 pub fn initialize_resume(&mut self, prompt_content: &str) {
745 self.initialize_with_topic("task.resume", prompt_content);
747 }
748
749 fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
751 self.ralph.set_objective(prompt_content.to_string());
755
756 let start_event = Event::new(topic, prompt_content);
757 self.bus.publish(start_event);
758 debug!(topic = topic, "Published {} event", topic);
759 }
760
761 pub fn next_hat(&self) -> Option<&HatId> {
770 let next = self.bus.next_hat_with_pending();
771
772 if next.is_none() && self.bus.has_human_pending() {
774 return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
775 }
776
777 next.as_ref()?;
779
780 if self.registry.is_empty() {
783 next
785 } else {
786 self.bus.hat_ids().find(|id| id.as_str() == "ralph")
789 }
790 }
791
792 pub fn sync_event_reader_to_file_end(&mut self) {
799 let path = self.event_reader.path();
800 if let Ok(metadata) = std::fs::metadata(path) {
801 self.event_reader.set_position(metadata.len());
802 }
803 }
804
805 pub fn has_pending_events(&self) -> bool {
810 self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
811 }
812
813 pub fn has_pending_human_events(&self) -> bool {
818 self.bus.has_human_pending()
819 }
820
821 pub fn inject_human_guidance<I, S>(&mut self, messages: I)
826 where
827 I: IntoIterator<Item = S>,
828 S: Into<String>,
829 {
830 for message in messages {
831 let event = Event::new("human.guidance", message.into());
832 self.state.record_event(&event);
833 self.bus.publish(event);
834 }
835 }
836
837 pub fn has_pending_plan_events_in_jsonl(&self) -> std::io::Result<bool> {
842 let result = self.event_reader.peek_new_events()?;
843 Ok(result
844 .events
845 .iter()
846 .any(|event| event.topic.starts_with("plan.")))
847 }
848
849 pub fn pending_human_interact_context_in_jsonl(&self) -> std::io::Result<Option<Value>> {
852 let result = self.event_reader.peek_new_events()?;
853 Ok(result
854 .events
855 .iter()
856 .find(|event| event.topic == "human.interact")
857 .map(|event| {
858 Self::parse_human_interact_context(event.payload.as_deref().unwrap_or_default())
859 }))
860 }
861
862 pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
866 self.registry
867 .get(hat_id)
868 .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
869 .unwrap_or_default()
870 }
871
872 pub fn inject_fallback_event(&mut self) -> bool {
879 let fallback_event = match &self.state.last_hat {
882 Some(hat_id) if hat_id.as_str() != "ralph" => {
883 let publishes = self.get_hat_publishes(hat_id);
884 let payload = if publishes.is_empty() {
885 format!(
886 "RECOVERY: Previous iteration by hat `{}` did not publish an event. \
887 Emit exactly one valid next event via `ralph emit`, or stop only after \
888 publishing the configured completion event.",
889 hat_id.as_str()
890 )
891 } else {
892 format!(
893 "RECOVERY: Previous iteration by hat `{}` did not publish an event. \
894 This failed because no event was emitted. Emit exactly ONE valid next \
895 event via `ralph emit`. Allowed topics: `{}`. Do not only write prose \
896 or update files. Stop immediately after emitting.",
897 hat_id.as_str(),
898 publishes.join("`, `")
899 )
900 };
901
902 debug!(
903 hat = %hat_id.as_str(),
904 "Injecting fallback event to recover - targeting last hat with task.resume"
905 );
906 Event::new("task.resume", payload).with_target(hat_id.clone())
907 }
908 _ => {
909 debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
910 Event::new(
911 "task.resume",
912 "RECOVERY: Previous iteration did not publish an event. \
913 Review the scratchpad and either dispatch the next task or complete the loop.",
914 )
915 }
916 };
917
918 self.bus.publish(fallback_event);
919 true
920 }
921
922 pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
936 if hat_id.as_str() == "ralph" {
939 if self.registry.is_empty() {
940 let mut events = self.bus.take_pending(&hat_id.clone());
942 let mut human_events = self.bus.take_human_pending();
943 events.append(&mut human_events);
944
945 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
947 .into_iter()
948 .partition(|e| e.topic.as_str() == "human.guidance");
949
950 let events_context = regular_events
951 .iter()
952 .map(|e| Self::format_event(e))
953 .collect::<Vec<_>>()
954 .join("\n");
955
956 self.ralph
958 .set_active_scratchpad(self.config.core.scratchpad.clone());
959 self.ralph.set_iteration(self.state.iteration);
960
961 self.update_robot_guidance(guidance_events);
963 self.apply_robot_guidance();
964
965 let base_prompt = self.ralph.build_prompt(&events_context, &[]);
967 self.ralph.clear_robot_guidance();
968 let with_skills = self.prepend_auto_inject_skills(base_prompt);
969 let with_scratchpad = self.prepend_scratchpad(with_skills);
970 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
971
972 debug!("build_prompt: routing to HatlessRalph (solo mode)");
973 return Some(final_prompt);
974 } else {
975 let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
977 all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
979
980 let mut all_events = Vec::new();
981 let mut system_events = Vec::new();
982
983 for id in &all_hat_ids {
984 let pending = self.bus.take_pending(id);
985 if pending.is_empty() {
986 continue;
987 }
988
989 let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
990 if drop_pending {
991 if let Some(exhausted_event) = exhausted_event {
993 all_events.push(exhausted_event.clone());
994 system_events.push(exhausted_event);
995 }
996 continue;
997 }
998
999 all_events.extend(pending);
1000 }
1001
1002 let mut human_events = self.bus.take_human_pending();
1003 all_events.append(&mut human_events);
1004
1005 for event in system_events {
1008 self.bus.publish(event);
1009 }
1010
1011 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
1013 .into_iter()
1014 .partition(|e| e.topic.as_str() == "human.guidance");
1015
1016 let effective_regular_events = self.effective_regular_events(®ular_events);
1018
1019 let active_hat_ids = self.determine_active_hat_ids(®ular_events);
1021 self.record_hat_activations(&active_hat_ids);
1022 self.state.last_active_hat_ids = active_hat_ids.clone();
1023
1024 let resolved_scratchpad = if let Some(hat_id) = active_hat_ids.first() {
1028 let hat_scratchpad = self
1029 .registry
1030 .get_config(hat_id)
1031 .and_then(|c| c.scratchpad.as_ref());
1032 ScratchpadConfig::resolve(hat_scratchpad, &self.config.core.scratchpad)
1033 } else {
1034 self.config.core.scratchpad.clone()
1036 };
1037 self.ralph.set_active_scratchpad(resolved_scratchpad);
1038 self.ralph.set_iteration(self.state.iteration);
1039
1040 self.update_robot_guidance(guidance_events);
1043 self.apply_robot_guidance();
1044
1045 let active_hats = self.determine_active_hats(®ular_events);
1046
1047 let events_context = effective_regular_events
1049 .iter()
1050 .map(|e| Self::format_event(e))
1051 .collect::<Vec<_>>()
1052 .join("\n");
1053
1054 let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
1056
1057 debug!(
1059 "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
1060 active_hats
1061 .iter()
1062 .map(|h| h.id.as_str())
1063 .collect::<Vec<_>>()
1064 );
1065
1066 self.ralph.clear_robot_guidance();
1068 let with_skills = self.prepend_auto_inject_skills(base_prompt);
1069 let with_scratchpad = self.prepend_scratchpad(with_skills);
1070 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
1071
1072 return Some(final_prompt);
1073 }
1074 }
1075
1076 let events = self.bus.take_pending(&hat_id.clone());
1080 let events_context = events
1081 .iter()
1082 .map(|e| Self::format_event(e))
1083 .collect::<Vec<_>>()
1084 .join("\n");
1085
1086 let hat = self.registry.get(hat_id)?;
1087
1088 debug!(
1090 "build_prompt: hat_id='{}', instructions.is_empty()={}",
1091 hat_id.as_str(),
1092 hat.instructions.is_empty()
1093 );
1094
1095 debug!(
1097 "build_prompt: routing to build_custom_hat() for '{}'",
1098 hat_id.as_str()
1099 );
1100 Some(
1101 self.instruction_builder
1102 .build_custom_hat(hat, &events_context),
1103 )
1104 }
1105
1106 fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
1112 if guidance_events.is_empty() {
1113 return;
1114 }
1115
1116 self.persist_guidance_to_scratchpad(&guidance_events);
1118
1119 self.robot_guidance
1120 .extend(guidance_events.into_iter().map(|e| e.payload));
1121 }
1122
1123 fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
1132 use std::io::Write;
1133
1134 let scratchpad_path = if self.ralph.active_scratchpad().enabled {
1136 self.scratchpad_path()
1137 } else {
1138 if !self.config.core.scratchpad.enabled {
1139 debug!("Both hat and global scratchpad disabled, skipping guidance persistence");
1140 return;
1141 }
1142 self.global_scratchpad_path()
1143 };
1144 let resolved_path = if scratchpad_path.is_relative() {
1145 self.config.core.workspace_root.join(&scratchpad_path)
1146 } else {
1147 scratchpad_path
1148 };
1149
1150 if let Some(parent) = resolved_path.parent()
1152 && !parent.exists()
1153 && let Err(e) = std::fs::create_dir_all(parent)
1154 {
1155 warn!("Failed to create scratchpad directory: {}", e);
1156 return;
1157 }
1158
1159 let mut file = match std::fs::OpenOptions::new()
1160 .create(true)
1161 .append(true)
1162 .open(&resolved_path)
1163 {
1164 Ok(f) => f,
1165 Err(e) => {
1166 warn!("Failed to open scratchpad for guidance persistence: {}", e);
1167 return;
1168 }
1169 };
1170
1171 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
1172 for event in guidance_events {
1173 let entry = format!(
1174 "\n### HUMAN GUIDANCE ({})\n\n{}\n",
1175 timestamp, event.payload
1176 );
1177 if let Err(e) = file.write_all(entry.as_bytes()) {
1178 warn!("Failed to write guidance to scratchpad: {}", e);
1179 }
1180 }
1181
1182 info!(
1183 count = guidance_events.len(),
1184 "Persisted human guidance to scratchpad"
1185 );
1186 }
1187
1188 fn apply_robot_guidance(&mut self) {
1190 if self.robot_guidance.is_empty() {
1191 return;
1192 }
1193
1194 self.ralph.set_robot_guidance(self.robot_guidance.clone());
1195 }
1196
1197 fn prepend_auto_inject_skills(&self, prompt: String) -> String {
1207 let mut prefix = String::new();
1208
1209 self.inject_memories_and_tools_skill(&mut prefix);
1211
1212 self.inject_robot_skill(&mut prefix);
1214
1215 self.inject_custom_auto_skills(&mut prefix);
1217
1218 if prefix.is_empty() {
1219 return prompt;
1220 }
1221
1222 prefix.push_str("\n\n");
1223 prefix.push_str(&prompt);
1224 prefix
1225 }
1226
1227 fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
1235 let memories_config = &self.config.memories;
1236
1237 if memories_config.enabled && memories_config.inject == InjectMode::Auto {
1239 info!(
1240 "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
1241 memories_config.enabled, memories_config.inject, self.config.core.workspace_root
1242 );
1243
1244 let workspace_root = &self.config.core.workspace_root;
1245 let store = MarkdownMemoryStore::with_default_path(workspace_root);
1246 let memories_path = workspace_root.join(".ralph/agent/memories.md");
1247
1248 info!(
1249 "Looking for memories at: {:?} (exists: {})",
1250 memories_path,
1251 memories_path.exists()
1252 );
1253
1254 let memories = match store.load() {
1255 Ok(memories) => {
1256 info!("Successfully loaded {} memories from store", memories.len());
1257 memories
1258 }
1259 Err(e) => {
1260 info!(
1261 "Failed to load memories for injection: {} (path: {:?})",
1262 e, memories_path
1263 );
1264 Vec::new()
1265 }
1266 };
1267
1268 if memories.is_empty() {
1269 info!("Memory store is empty - no memories to inject");
1270 } else {
1271 let mut memories_content = format_memories_as_markdown(&memories);
1272
1273 if memories_config.budget > 0 {
1274 let original_len = memories_content.len();
1275 memories_content =
1276 truncate_to_budget(&memories_content, memories_config.budget);
1277 debug!(
1278 "Applied budget: {} chars -> {} chars (budget: {})",
1279 original_len,
1280 memories_content.len(),
1281 memories_config.budget
1282 );
1283 }
1284
1285 info!(
1286 "Injecting {} memories ({} chars) into prompt",
1287 memories.len(),
1288 memories_content.len()
1289 );
1290
1291 prefix.push_str(&memories_content);
1292 }
1293 }
1294
1295 let tasks_enabled = self.config.tasks.enabled;
1297
1298 if (memories_config.enabled || tasks_enabled)
1300 && let Some(skill) = self.skill_registry.get("ralph-tools")
1301 {
1302 if !prefix.is_empty() {
1303 prefix.push_str("\n\n");
1304 }
1305 prefix.push_str(&format!(
1306 "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1307 skill.content.trim()
1308 ));
1309 debug!("Injected ralph-tools skill from registry");
1310 }
1311
1312 if tasks_enabled && let Some(skill) = self.skill_registry.get("ralph-tools-tasks") {
1314 if !prefix.is_empty() {
1315 prefix.push_str("\n\n");
1316 }
1317 prefix.push_str(&format!(
1318 "<ralph-tools-tasks-skill>\n{}\n</ralph-tools-tasks-skill>",
1319 skill.content.trim()
1320 ));
1321 debug!("Injected ralph-tools-tasks skill from registry");
1322 }
1323
1324 if memories_config.enabled
1326 && let Some(skill) = self.skill_registry.get("ralph-tools-memories")
1327 {
1328 if !prefix.is_empty() {
1329 prefix.push_str("\n\n");
1330 }
1331 prefix.push_str(&format!(
1332 "<ralph-tools-memories-skill>\n{}\n</ralph-tools-memories-skill>",
1333 skill.content.trim()
1334 ));
1335 debug!("Injected ralph-tools-memories skill from registry");
1336 }
1337 }
1338
1339 fn inject_robot_skill(&self, prefix: &mut String) {
1344 if !self.config.robot.enabled {
1345 return;
1346 }
1347
1348 if let Some(skill) = self.skill_registry.get("robot-interaction") {
1349 if !prefix.is_empty() {
1350 prefix.push_str("\n\n");
1351 }
1352 prefix.push_str(&format!(
1353 "<robot-skill>\n{}\n</robot-skill>",
1354 skill.content.trim()
1355 ));
1356 debug!("Injected robot interaction skill from registry");
1357 }
1358 }
1359
1360 fn inject_custom_auto_skills(&self, prefix: &mut String) {
1362 for skill in self.skill_registry.auto_inject_skills(None) {
1363 if matches!(
1365 skill.name.as_str(),
1366 "ralph-tools" | "ralph-tools-tasks" | "ralph-tools-memories" | "robot-interaction"
1367 ) {
1368 continue;
1369 }
1370
1371 if !prefix.is_empty() {
1372 prefix.push_str("\n\n");
1373 }
1374 prefix.push_str(&format!(
1375 "<{name}-skill>\n{content}\n</{name}-skill>",
1376 name = skill.name,
1377 content = skill.content.trim()
1378 ));
1379 debug!("Injected auto-inject skill: {}", skill.name);
1380 }
1381 }
1382
1383 fn prepend_scratchpad(&self, prompt: String) -> String {
1389 if !self.ralph.active_scratchpad().enabled {
1391 return prompt;
1392 }
1393
1394 let scratchpad_path = self.scratchpad_path();
1395
1396 let resolved_path = if scratchpad_path.is_relative() {
1397 self.config.core.workspace_root.join(&scratchpad_path)
1398 } else {
1399 scratchpad_path
1400 };
1401
1402 if !resolved_path.exists() {
1403 debug!(
1404 "Scratchpad not found at {:?}, skipping injection",
1405 resolved_path
1406 );
1407 return prompt;
1408 }
1409
1410 let content = match std::fs::read_to_string(&resolved_path) {
1411 Ok(c) => c,
1412 Err(e) => {
1413 info!("Failed to read scratchpad for injection: {}", e);
1414 return prompt;
1415 }
1416 };
1417
1418 if content.trim().is_empty() {
1419 debug!("Scratchpad is empty, skipping injection");
1420 return prompt;
1421 }
1422
1423 let char_budget = 4000 * 4;
1425 let content = if content.len() > char_budget {
1426 let start = content.len() - char_budget;
1428 let start = floor_char_boundary(&content, start);
1430 let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1431 let discarded = &content[..line_start];
1432
1433 let headings: Vec<&str> = discarded
1435 .lines()
1436 .filter(|line| line.starts_with('#'))
1437 .collect();
1438 let summary = if headings.is_empty() {
1439 format!(
1440 "<!-- earlier content truncated ({} chars omitted) -->",
1441 line_start
1442 )
1443 } else {
1444 format!(
1445 "<!-- earlier content truncated ({} chars omitted) -->\n\
1446 <!-- discarded sections: {} -->",
1447 line_start,
1448 headings.join(" | ")
1449 )
1450 };
1451
1452 format!("{}\n\n{}", summary, &content[line_start..])
1453 } else {
1454 content
1455 };
1456
1457 info!("Injecting scratchpad ({} chars) into prompt", content.len());
1458
1459 let mut final_prompt = format!(
1460 "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1461 self.ralph.active_scratchpad().path,
1462 content
1463 );
1464 final_prompt.push_str(&prompt);
1465 final_prompt
1466 }
1467
1468 fn prepend_ready_tasks(&self, prompt: String) -> String {
1474 if !self.config.tasks.enabled {
1475 return prompt;
1476 }
1477
1478 use crate::task::TaskStatus;
1479 use crate::task_store::TaskStore;
1480
1481 let tasks_path = self.tasks_path();
1482 let resolved_path = if tasks_path.is_relative() {
1483 self.config.core.workspace_root.join(&tasks_path)
1484 } else {
1485 tasks_path
1486 };
1487
1488 if !resolved_path.exists() {
1489 return prompt;
1490 }
1491
1492 let store = match TaskStore::load(&resolved_path) {
1493 Ok(s) => s,
1494 Err(e) => {
1495 info!("Failed to load task store for injection: {}", e);
1496 return prompt;
1497 }
1498 };
1499
1500 let current_loop_id = self.current_loop_id();
1501
1502 let ready = Self::filter_tasks_by_loop(store.ready(), current_loop_id.as_deref());
1503 let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1504 let all_count =
1505 Self::filter_tasks_by_loop(store.all().iter().collect(), current_loop_id.as_deref())
1506 .len();
1507 let closed_count = all_count - open.len();
1508
1509 if open.is_empty() && closed_count == 0 {
1510 return prompt;
1511 }
1512
1513 let mut section = String::from("<ready-tasks>\n");
1514 if ready.is_empty() && open.is_empty() {
1515 section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1516 } else {
1517 section.push_str(&format!(
1518 "## Tasks: {} ready, {} open, {} closed\n\n",
1519 ready.len(),
1520 open.len(),
1521 closed_count
1522 ));
1523 for task in &ready {
1524 let status_icon = match task.status {
1525 TaskStatus::Open => "[ ]",
1526 TaskStatus::InProgress => "[~]",
1527 _ => "[?]",
1528 };
1529 section.push_str(&format!(
1530 "- {} [P{}] {} ({}){}\n",
1531 status_icon,
1532 task.priority,
1533 task.title,
1534 task.id,
1535 task.key
1536 .as_deref()
1537 .map(|key| format!(" — key: {key}"))
1538 .unwrap_or_default()
1539 ));
1540 }
1541 let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1543 let blocked: Vec<_> = open
1544 .iter()
1545 .filter(|t| !ready_ids.contains(&t.id.as_str()))
1546 .collect();
1547 if !blocked.is_empty() {
1548 section.push_str("\nBlocked:\n");
1549 for task in blocked {
1550 section.push_str(&format!(
1551 "- [blocked] [P{}] {} ({}){} — blocked by: {}\n",
1552 task.priority,
1553 task.title,
1554 task.id,
1555 task.key
1556 .as_deref()
1557 .map(|key| format!(" — key: {key}"))
1558 .unwrap_or_default(),
1559 task.blocked_by.join(", ")
1560 ));
1561 }
1562 }
1563 }
1564 section.push_str("</ready-tasks>\n\n");
1565
1566 info!(
1567 "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1568 ready.len(),
1569 open.len(),
1570 closed_count
1571 );
1572
1573 let mut final_prompt = section;
1574 final_prompt.push_str(&prompt);
1575 final_prompt
1576 }
1577
1578 pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1580 self.ralph.build_prompt(prompt_content, &[])
1581 }
1582
1583 fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1586 let mut active_hats = Vec::new();
1587 for id in self.determine_active_hat_ids(events) {
1588 if let Some(hat) = self.registry.get(&id) {
1589 active_hats.push(hat);
1590 }
1591 }
1592 active_hats
1593 }
1594
1595 fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1596 let mut entrypoint_hat_ids = Vec::new();
1597 let mut progressed_hat_ids = Vec::new();
1598 for event in events {
1599 let hat_id = if let Some(target) = &event.target
1601 && self.registry.get(target).is_some()
1602 {
1603 target.clone()
1604 } else if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1605 hat.id.clone()
1606 } else {
1607 continue;
1608 };
1609
1610 let list = if self.is_entrypoint_topic(event.topic.as_str()) {
1611 &mut entrypoint_hat_ids
1612 } else {
1613 &mut progressed_hat_ids
1614 };
1615 if !list.iter().any(|id| id == &hat_id) {
1616 list.push(hat_id);
1617 }
1618 }
1619 if progressed_hat_ids.is_empty() {
1625 entrypoint_hat_ids
1626 } else {
1627 progressed_hat_ids
1628 }
1629 }
1630
1631 fn effective_regular_events<'a>(&self, events: &'a [Event]) -> Vec<&'a Event> {
1632 let has_downstream_event = events
1633 .iter()
1634 .any(|event| !Self::is_kickoff_or_recovery_event(event.topic.as_str()));
1635 events
1636 .iter()
1637 .filter(|event| {
1638 !has_downstream_event || !Self::is_kickoff_or_recovery_event(event.topic.as_str())
1639 })
1640 .collect()
1641 }
1642
1643 fn is_kickoff_or_recovery_event(topic: &str) -> bool {
1644 topic == "task.start" || topic == "task.resume" || topic.strip_suffix(".start").is_some()
1645 }
1646
1647 fn is_entrypoint_topic(&self, topic: &str) -> bool {
1648 topic == "task.start"
1649 || topic == "task.resume"
1650 || topic.strip_suffix(".start").is_some()
1651 || self.config.event_loop.starting_event.as_deref() == Some(topic)
1652 }
1653
1654 fn peek_pending_regular_events(&self) -> Vec<Event> {
1655 let mut events = Vec::new();
1656 for hat_id in self.bus.hat_ids() {
1657 if let Some(pending) = self.bus.peek_pending(hat_id) {
1658 events.extend(pending.iter().cloned());
1659 }
1660 }
1661 events
1662 }
1663
1664 fn format_event(event: &Event) -> String {
1669 let topic = &event.topic;
1670 let payload = &event.payload;
1671
1672 if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1673 format!(
1674 "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1675 topic, payload
1676 )
1677 } else {
1678 format!("Event: {} - {}", topic, payload)
1679 }
1680 }
1681
1682 fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1683 let Some(config) = self.registry.get_config(hat_id) else {
1684 return (false, None);
1685 };
1686 let Some(max) = config.max_activations else {
1687 return (false, None);
1688 };
1689
1690 let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1691 if count < max {
1692 return (false, None);
1693 }
1694
1695 let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1697
1698 if !should_emit {
1699 return (true, None);
1701 }
1702
1703 let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1704 dropped_topics.sort();
1705
1706 let payload = format!(
1707 "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n - {topics}",
1708 hat = hat_id.as_str(),
1709 max = max,
1710 count = count,
1711 topics = dropped_topics.join("\n - ")
1712 );
1713
1714 warn!(
1715 hat = %hat_id.as_str(),
1716 max_activations = max,
1717 activations = count,
1718 "Hat exhausted (max_activations reached)"
1719 );
1720
1721 (
1722 true,
1723 Some(Event::new(
1724 format!("{}.exhausted", hat_id.as_str()),
1725 payload,
1726 )),
1727 )
1728 }
1729
1730 fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1731 for hat_id in active_hat_ids {
1732 *self
1733 .state
1734 .hat_activation_counts
1735 .entry(hat_id.clone())
1736 .or_insert(0) += 1;
1737 }
1738 }
1739
1740 pub fn get_active_hat_id(&self) -> HatId {
1744 let pending_events = self.peek_pending_regular_events();
1745 if let Some(active_hat_id) = self
1746 .determine_active_hat_ids(&pending_events)
1747 .into_iter()
1748 .next()
1749 {
1750 return active_hat_id;
1751 }
1752 HatId::new("ralph")
1753 }
1754
1755 pub fn check_default_publishes(&mut self, hat_id: &HatId) {
1765 if let Some(config) = self.registry.get_config(hat_id)
1766 && let Some(default_topic) = &config.default_publishes
1767 {
1768 let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1769
1770 debug!(
1771 hat = %hat_id.as_str(),
1772 topic = %default_topic,
1773 "No events written by hat, injecting default_publishes event"
1774 );
1775
1776 self.state.record_event(&default_event);
1777
1778 if default_topic.as_str() == self.config.event_loop.completion_promise {
1782 info!(
1783 hat = %hat_id.as_str(),
1784 topic = %default_topic,
1785 "default_publishes matches completion_promise — requesting termination"
1786 );
1787 self.state.completion_requested = true;
1788 }
1789
1790 self.bus.publish(default_event);
1791 }
1792 }
1793
1794 pub fn bus(&mut self) -> &mut EventBus {
1799 &mut self.bus
1800 }
1801
1802 pub fn process_output(
1806 &mut self,
1807 hat_id: &HatId,
1808 output: &str,
1809 success: bool,
1810 ) -> Option<TerminationReason> {
1811 self.state.iteration += 1;
1812 self.state.last_hat = Some(hat_id.clone());
1813
1814 if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1816 && let Some(ref robot_service) = self.robot_service
1817 {
1818 let elapsed = self.state.elapsed();
1819 let interval = std::time::Duration::from_secs(interval_secs);
1820 let last = self
1821 .state
1822 .last_checkin_at
1823 .map(|t| t.elapsed())
1824 .unwrap_or(elapsed);
1825
1826 if last >= interval {
1827 let context = self.build_checkin_context(hat_id);
1828 match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1829 Ok(_) => {
1830 self.state.last_checkin_at = Some(std::time::Instant::now());
1831 debug!(iteration = self.state.iteration, "Sent robot check-in");
1832 }
1833 Err(e) => {
1834 warn!(error = %e, "Failed to send robot check-in");
1835 }
1836 }
1837 }
1838 }
1839
1840 self.diagnostics.log_orchestration(
1842 self.state.iteration,
1843 "loop",
1844 crate::diagnostics::OrchestrationEvent::IterationStarted,
1845 );
1846
1847 self.diagnostics.log_orchestration(
1849 self.state.iteration,
1850 "loop",
1851 crate::diagnostics::OrchestrationEvent::HatSelected {
1852 hat: hat_id.to_string(),
1853 reason: "process_output".to_string(),
1854 },
1855 );
1856
1857 if success {
1859 self.state.consecutive_failures = 0;
1860 } else {
1861 self.state.consecutive_failures += 1;
1862 }
1863
1864 let _ = output;
1865
1866 self.audit_file_modifications(hat_id);
1869
1870 self.check_termination()
1876 }
1877
1878 fn audit_file_modifications(&mut self, hat_id: &HatId) {
1884 let config = match self.registry.get_config(hat_id) {
1885 Some(c) => c,
1886 None => return,
1887 };
1888
1889 let has_write_restriction = config
1890 .disallowed_tools
1891 .iter()
1892 .any(|t| t == "Edit" || t == "Write");
1893
1894 if !has_write_restriction {
1895 return;
1896 }
1897
1898 let workspace = &self.config.core.workspace_root;
1899 let diff_output = std::process::Command::new("git")
1900 .args(["diff", "--stat", "HEAD"])
1901 .current_dir(workspace)
1902 .output();
1903
1904 match diff_output {
1905 Ok(output) if !output.stdout.is_empty() => {
1906 let diff_stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
1907 warn!(
1908 hat = %hat_id.as_str(),
1909 diff = %diff_stat,
1910 "Hat modified files despite tool restrictions (scope violation)"
1911 );
1912
1913 let violation_topic = format!("{}.scope_violation", hat_id.as_str());
1914 let violation = Event::new(
1915 violation_topic.as_str(),
1916 format!(
1917 "Hat '{}' modified files with Edit/Write disallowed:\n{}",
1918 hat_id.as_str(),
1919 diff_stat
1920 ),
1921 );
1922 self.bus.publish(violation);
1923 }
1924 Err(e) => {
1925 debug!(error = %e, "Could not run git diff for file-modification audit");
1926 }
1927 _ => {} }
1929 }
1930
1931 fn extract_task_id(payload: &str) -> String {
1934 payload
1935 .lines()
1936 .next()
1937 .unwrap_or("unknown")
1938 .trim()
1939 .to_string()
1940 }
1941
1942 pub fn add_cost(&mut self, cost: f64) {
1944 self.state.cumulative_cost += cost;
1945 }
1946
1947 fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1954 if !self.ralph.active_scratchpad().enabled {
1956 return Ok(true);
1957 }
1958
1959 let scratchpad_path = self.scratchpad_path();
1960
1961 if !scratchpad_path.exists() {
1962 return Err(std::io::Error::new(
1963 std::io::ErrorKind::NotFound,
1964 "Scratchpad does not exist",
1965 ));
1966 }
1967
1968 let content = std::fs::read_to_string(scratchpad_path)?;
1969
1970 let has_pending = content
1971 .lines()
1972 .any(|line| line.trim_start().starts_with("- [ ]"));
1973
1974 Ok(!has_pending)
1975 }
1976
1977 fn current_loop_id(&self) -> Option<String> {
1982 self.loop_context
1983 .as_ref()
1984 .and_then(|ctx| {
1985 let marker_path = ctx.ralph_dir().join("current-loop-id");
1986 std::fs::read_to_string(&marker_path).ok()
1987 })
1988 .map(|id| id.trim().to_string())
1989 .filter(|id| !id.is_empty())
1990 }
1991
1992 fn filter_tasks_by_loop<'a>(
1994 tasks: Vec<&'a crate::task::Task>,
1995 loop_id: Option<&str>,
1996 ) -> Vec<&'a crate::task::Task> {
1997 match loop_id {
1998 Some(id) => tasks
1999 .into_iter()
2000 .filter(|t| t.loop_id.as_deref() == Some(id))
2001 .collect(),
2002 None => tasks,
2003 }
2004 }
2005
2006 fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
2007 use crate::task_store::TaskStore;
2008
2009 let tasks_path = self.tasks_path();
2010
2011 if !tasks_path.exists() {
2013 return Ok(true);
2014 }
2015
2016 let store = TaskStore::load(&tasks_path)?;
2017 let current_loop_id = self.current_loop_id();
2018 let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
2019 Ok(open.is_empty())
2020 }
2021
2022 fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
2024 let (open_tasks, closed_tasks) = self.count_tasks();
2025 CheckinContext {
2026 current_hat: Some(hat_id.as_str().to_string()),
2027 open_tasks,
2028 closed_tasks,
2029 cumulative_cost: self.state.cumulative_cost,
2030 }
2031 }
2032
2033 fn count_tasks(&self) -> (usize, usize) {
2038 use crate::task_store::TaskStore;
2039
2040 let tasks_path = self.tasks_path();
2041 if !tasks_path.exists() {
2042 return (0, 0);
2043 }
2044
2045 match TaskStore::load(&tasks_path) {
2046 Ok(store) => {
2047 let current_loop_id = self.current_loop_id();
2048 let all = Self::filter_tasks_by_loop(
2049 store.all().iter().collect(),
2050 current_loop_id.as_deref(),
2051 );
2052 let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
2053 let closed = all.len() - open.len();
2054 (open.len(), closed)
2055 }
2056 Err(_) => (0, 0),
2057 }
2058 }
2059
2060 fn get_open_task_list(&self) -> Vec<String> {
2062 use crate::task_store::TaskStore;
2063
2064 let tasks_path = self.tasks_path();
2065 if let Ok(store) = TaskStore::load(&tasks_path) {
2066 let current_loop_id = self.current_loop_id();
2067 let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
2068 return open
2069 .iter()
2070 .map(|t| format!("{}: {}", t.id, t.title))
2071 .collect();
2072 }
2073 vec![]
2074 }
2075
2076 fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
2077 let threshold = self.config.event_loop.mutation_score_warn_threshold;
2078
2079 match &evidence.mutants {
2080 Some(mutants) => {
2081 if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
2082 warn!(
2083 reason = %reason,
2084 mutants_status = ?mutants.status,
2085 mutants_score = mutants.score_percent,
2086 mutants_threshold = threshold,
2087 "Mutation testing warning"
2088 );
2089 }
2090 }
2091 None => {
2092 if let Some(threshold) = threshold {
2093 warn!(
2094 mutants_threshold = threshold,
2095 "Mutation testing warning: missing mutation evidence in build.done payload"
2096 );
2097 }
2098 }
2099 }
2100 }
2101
2102 fn mutation_warning_reason(
2103 mutants: &MutationEvidence,
2104 threshold: Option<f64>,
2105 ) -> Option<String> {
2106 match mutants.status {
2107 MutationStatus::Fail => Some("mutation testing failed".to_string()),
2108 MutationStatus::Warn => Some(Self::format_mutation_message(
2109 "mutation score below threshold",
2110 mutants.score_percent,
2111 )),
2112 MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
2113 MutationStatus::Pass => {
2114 let threshold = threshold?;
2115
2116 match mutants.score_percent {
2117 Some(score) if score < threshold => Some(format!(
2118 "mutation score {:.2}% below threshold {:.2}%",
2119 score, threshold
2120 )),
2121 Some(_) => None,
2122 None => Some(format!(
2123 "mutation score missing (threshold {:.2}%)",
2124 threshold
2125 )),
2126 }
2127 }
2128 }
2129 }
2130
2131 fn format_mutation_message(message: &str, score: Option<f64>) -> String {
2132 match score {
2133 Some(score) => format!("{message} ({score:.2}%)"),
2134 None => message.to_string(),
2135 }
2136 }
2137
2138 fn parse_human_interact_context(payload: &str) -> Value {
2139 let mut context = match serde_json::from_str::<Value>(payload) {
2140 Ok(Value::Object(map)) => map,
2141 Ok(value) => {
2142 let mut map = Map::new();
2143 map.insert("question".to_string(), value);
2144 map
2145 }
2146 Err(_) => {
2147 let mut map = Map::new();
2148 map.insert("question".to_string(), Value::String(payload.to_string()));
2149 map
2150 }
2151 };
2152
2153 if !context.contains_key("question") {
2154 context.insert("question".to_string(), Value::String(payload.to_string()));
2155 }
2156
2157 Value::Object(context)
2158 }
2159
2160 fn is_restart_request_payload(payload: &str) -> bool {
2161 let payload = payload.to_ascii_lowercase();
2162 payload.contains("restart yourself") || payload.contains("restart ralph")
2163 }
2164
2165 fn is_restart_request_event(event: &Event) -> bool {
2166 matches!(event.topic.as_str(), "human.response" | "user.prompt")
2167 && Self::is_restart_request_payload(&event.payload)
2168 }
2169
2170 fn mark_restart_requested(&self, source: &str) {
2171 let restart_path =
2172 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
2173
2174 if let Some(parent) = restart_path.parent()
2175 && let Err(err) = std::fs::create_dir_all(parent)
2176 {
2177 warn!(
2178 error = %err,
2179 path = %parent.display(),
2180 "Failed to create restart-requested parent directory"
2181 );
2182 return;
2183 }
2184
2185 if let Err(err) = std::fs::write(&restart_path, source) {
2186 warn!(
2187 error = %err,
2188 path = %restart_path.display(),
2189 "Failed to write restart-requested signal"
2190 );
2191 return;
2192 }
2193
2194 info!(
2195 source,
2196 path = %restart_path.display(),
2197 "Restart requested from human text"
2198 );
2199 }
2200
2201 pub fn process_events_from_jsonl(&mut self) -> std::io::Result<ProcessedEvents> {
2213 let result = self.event_reader.read_new_events()?;
2214 self.process_parse_result(result)
2215 }
2216
2217 fn process_parse_result(
2223 &mut self,
2224 result: crate::event_reader::ParseResult,
2225 ) -> std::io::Result<ProcessedEvents> {
2226 for malformed in &result.malformed {
2228 let payload = format!(
2229 "Line {}: {}\nContent: {}",
2230 malformed.line_number, malformed.error, &malformed.content
2231 );
2232 let event = Event::new("event.malformed", &payload);
2233 self.bus.publish(event);
2234 self.state.consecutive_malformed_events += 1;
2235 warn!(
2236 line = malformed.line_number,
2237 consecutive = self.state.consecutive_malformed_events,
2238 "Malformed event line detected"
2239 );
2240 }
2241
2242 if !result.events.is_empty() {
2244 self.state.consecutive_malformed_events = 0;
2245 }
2246
2247 if result.events.is_empty() && result.malformed.is_empty() {
2248 return Ok(ProcessedEvents {
2249 had_events: false,
2250 had_plan_events: false,
2251 human_interact_context: None,
2252 has_orphans: false,
2253 });
2254 }
2255
2256 let events = if self.config.event_loop.enforce_hat_scope {
2259 let active_hats = self.state.last_active_hat_ids.clone();
2260 let (in_scope, out_of_scope): (Vec<_>, Vec<_>) =
2261 result.events.into_iter().partition(|event| {
2262 if active_hats.is_empty() {
2263 return true; }
2265 active_hats
2266 .iter()
2267 .any(|hat_id| self.registry.can_publish(hat_id, event.topic.as_str()))
2268 });
2269
2270 for event in &out_of_scope {
2271 let violation_hat = active_hats.first().map(|h| h.as_str()).unwrap_or("unknown");
2272 warn!(
2273 active_hats = ?active_hats,
2274 topic = %event.topic,
2275 "Scope violation: active hat(s) cannot publish this topic — dropping event"
2276 );
2277 let violation_topic = format!("{}.scope_violation", violation_hat);
2278 let violation_payload = format!(
2279 "Attempted to publish '{}': {}",
2280 event.topic,
2281 event.payload.clone().unwrap_or_default()
2282 );
2283 let violation = Event::new(violation_topic, violation_payload);
2284 self.bus.publish(violation);
2285 }
2286
2287 in_scope
2288 } else {
2289 result.events
2290 };
2291 let mut has_orphans = false;
2294
2295 let mut validated_events = Vec::new();
2297 let completion_topic = self.config.event_loop.completion_promise.as_str();
2298 let cancellation_topic = self.config.event_loop.cancellation_promise.clone();
2299 let total_events = events.len();
2300 for (index, event) in events.into_iter().enumerate() {
2301 let payload = event.payload.clone().unwrap_or_default();
2302
2303 if !cancellation_topic.is_empty() && event.topic.as_str() == cancellation_topic {
2305 info!(
2306 payload = %payload,
2307 "loop.cancel event detected — scheduling graceful termination"
2308 );
2309 self.state.cancellation_requested = true;
2310 continue;
2312 }
2313
2314 if event.topic == completion_topic {
2315 if index + 1 == total_events {
2316 self.state.completion_requested = true;
2317 self.diagnostics.log_orchestration(
2318 self.state.iteration,
2319 "jsonl",
2320 crate::diagnostics::OrchestrationEvent::EventPublished {
2321 topic: event.topic.clone(),
2322 },
2323 );
2324 info!(
2325 topic = %event.topic,
2326 "Completion event detected in JSONL"
2327 );
2328 } else {
2329 warn!(
2330 topic = %event.topic,
2331 index = index,
2332 total_events = total_events,
2333 "Completion event ignored because it was not the last event"
2334 );
2335 }
2336 continue;
2337 }
2338
2339 if event.topic == "build.done" {
2340 if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
2342 if evidence.all_passed() {
2343 self.warn_on_mutation_evidence(&evidence);
2344 validated_events.push(Event::new(event.topic.as_str(), &payload));
2345 } else {
2346 warn!(
2348 tests = evidence.tests_passed,
2349 lint = evidence.lint_passed,
2350 typecheck = evidence.typecheck_passed,
2351 audit = evidence.audit_passed,
2352 coverage = evidence.coverage_passed,
2353 complexity = evidence.complexity_score,
2354 duplication = evidence.duplication_passed,
2355 performance = evidence.performance_regression,
2356 specs = evidence.specs_verified,
2357 "build.done rejected: backpressure checks failed"
2358 );
2359
2360 let complexity = evidence
2361 .complexity_score
2362 .map(|value| format!("{value:.2}"))
2363 .unwrap_or_else(|| "missing".to_string());
2364 let performance = match evidence.performance_regression {
2365 Some(true) => "regression".to_string(),
2366 Some(false) => "pass".to_string(),
2367 None => "missing".to_string(),
2368 };
2369 let specs = match evidence.specs_verified {
2370 Some(true) => "pass".to_string(),
2371 Some(false) => "fail".to_string(),
2372 None => "not reported".to_string(),
2373 };
2374
2375 self.diagnostics.log_orchestration(
2376 self.state.iteration,
2377 "jsonl",
2378 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2379 reason: format!(
2380 "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
2381 evidence.tests_passed,
2382 evidence.lint_passed,
2383 evidence.typecheck_passed,
2384 evidence.audit_passed,
2385 evidence.coverage_passed,
2386 complexity,
2387 evidence.duplication_passed,
2388 performance,
2389 specs
2390 ),
2391 },
2392 );
2393
2394 validated_events.push(Event::new(
2395 "build.blocked",
2396 "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
2397 ));
2398 }
2399 } else {
2400 warn!("build.done rejected: missing backpressure evidence");
2402
2403 self.diagnostics.log_orchestration(
2404 self.state.iteration,
2405 "jsonl",
2406 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2407 reason: "missing backpressure evidence".to_string(),
2408 },
2409 );
2410
2411 validated_events.push(Event::new(
2412 "build.blocked",
2413 "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
2414 ));
2415 }
2416 } else if event.topic == "review.done" && !event.is_wave_event() {
2417 if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
2421 if evidence.is_verified() {
2422 validated_events.push(Event::new(event.topic.as_str(), &payload));
2423 } else {
2424 warn!(
2426 tests = evidence.tests_passed,
2427 build = evidence.build_passed,
2428 "review.done rejected: verification checks failed"
2429 );
2430
2431 self.diagnostics.log_orchestration(
2432 self.state.iteration,
2433 "jsonl",
2434 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2435 reason: format!(
2436 "review verification failed: tests={}, build={}",
2437 evidence.tests_passed, evidence.build_passed
2438 ),
2439 },
2440 );
2441
2442 validated_events.push(Event::new(
2443 "review.blocked",
2444 "Review verification failed. Run tests and build before emitting review.done.",
2445 ));
2446 }
2447 } else {
2448 warn!("review.done rejected: missing verification evidence");
2450
2451 self.diagnostics.log_orchestration(
2452 self.state.iteration,
2453 "jsonl",
2454 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2455 reason: "missing review verification evidence".to_string(),
2456 },
2457 );
2458
2459 validated_events.push(Event::new(
2460 "review.blocked",
2461 "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
2462 ));
2463 }
2464 } else if event.topic == "verify.passed" {
2465 if let Some(report) = EventParser::parse_quality_report(&payload) {
2466 if report.meets_thresholds() {
2467 validated_events.push(Event::new(event.topic.as_str(), &payload));
2468 } else {
2469 let failed = report.failed_dimensions();
2470 let reason = if failed.is_empty() {
2471 "quality thresholds failed".to_string()
2472 } else {
2473 format!("quality thresholds failed: {}", failed.join(", "))
2474 };
2475
2476 warn!(
2477 failed_dimensions = ?failed,
2478 "verify.passed rejected: quality thresholds failed"
2479 );
2480
2481 self.diagnostics.log_orchestration(
2482 self.state.iteration,
2483 "jsonl",
2484 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2485 reason,
2486 },
2487 );
2488
2489 validated_events.push(Event::new(
2490 "verify.failed",
2491 "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
2492 ));
2493 }
2494 } else {
2495 warn!("verify.passed rejected: missing quality report");
2497
2498 self.diagnostics.log_orchestration(
2499 self.state.iteration,
2500 "jsonl",
2501 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2502 reason: "missing quality report".to_string(),
2503 },
2504 );
2505
2506 validated_events.push(Event::new(
2507 "verify.failed",
2508 "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
2509 ));
2510 }
2511 } else if event.topic == "verify.failed" {
2512 if EventParser::parse_quality_report(&payload).is_none() {
2513 warn!("verify.failed missing quality report");
2514 }
2515 validated_events.push(Event::new(event.topic.as_str(), &payload));
2516 } else {
2517 validated_events.push(Event::new(event.topic.as_str(), &payload));
2519 }
2520 }
2521
2522 let blocked_events: Vec<_> = validated_events
2524 .iter()
2525 .filter(|e| e.topic == "build.blocked".into())
2526 .collect();
2527
2528 for blocked_event in &blocked_events {
2529 let task_id = Self::extract_task_id(&blocked_event.payload);
2530
2531 let count = self
2532 .state
2533 .task_block_counts
2534 .entry(task_id.clone())
2535 .or_insert(0);
2536 *count += 1;
2537
2538 debug!(
2539 task_id = %task_id,
2540 block_count = *count,
2541 "Task blocked"
2542 );
2543
2544 if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
2546 warn!(
2547 task_id = %task_id,
2548 "Task abandoned after 3 consecutive blocks"
2549 );
2550
2551 self.state.abandoned_tasks.push(task_id.clone());
2552
2553 self.diagnostics.log_orchestration(
2554 self.state.iteration,
2555 "jsonl",
2556 crate::diagnostics::OrchestrationEvent::TaskAbandoned {
2557 reason: format!(
2558 "3 consecutive build.blocked events for task '{}'",
2559 task_id
2560 ),
2561 },
2562 );
2563
2564 let abandoned_event = Event::new(
2565 "build.task.abandoned",
2566 format!(
2567 "Task '{}' abandoned after 3 consecutive build.blocked events",
2568 task_id
2569 ),
2570 );
2571
2572 self.bus.publish(abandoned_event);
2573 }
2574 }
2575
2576 let has_blocked_event = !blocked_events.is_empty();
2578
2579 if has_blocked_event {
2580 self.state.consecutive_blocked += 1;
2581 } else {
2582 self.state.consecutive_blocked = 0;
2583 self.state.last_blocked_hat = None;
2584 }
2585
2586 let mut response_event = None;
2590 let mut human_interact_context = None;
2591 let ask_human_idx = validated_events
2592 .iter()
2593 .position(|e| e.topic == "human.interact".into());
2594
2595 if let Some(idx) = ask_human_idx {
2596 let ask_event = &validated_events[idx];
2597 let payload = ask_event.payload.clone();
2598
2599 let mut context = match Self::parse_human_interact_context(&payload) {
2600 Value::Object(map) => map,
2601 _ => Map::new(),
2602 };
2603
2604 if let Some(ref robot_service) = self.robot_service {
2605 info!(
2606 payload = %payload,
2607 "human.interact event detected — sending question via robot service"
2608 );
2609
2610 let send_ok = match robot_service.send_question(&payload) {
2612 Ok(_message_id) => true,
2613 Err(e) => {
2614 warn!(
2615 error = %e,
2616 "Failed to send human.interact question after retries — treating as timeout"
2617 );
2618 self.diagnostics.log_error(
2620 self.state.iteration,
2621 "telegram",
2622 crate::diagnostics::DiagnosticError::TelegramSendError {
2623 operation: "send_question".to_string(),
2624 error: e.to_string(),
2625 retry_count: 3,
2626 },
2627 );
2628 context.insert(
2629 "outcome".to_string(),
2630 Value::String("send_failure".to_string()),
2631 );
2632 context.insert("error".to_string(), Value::String(e.to_string()));
2633 false
2634 }
2635 };
2636
2637 if send_ok {
2640 let events_path = self
2643 .loop_context
2644 .as_ref()
2645 .and_then(|ctx| {
2646 std::fs::read_to_string(ctx.current_events_marker())
2647 .ok()
2648 .map(|s| ctx.workspace().join(s.trim()))
2649 })
2650 .or_else(|| {
2651 std::fs::read_to_string(".ralph/current-events")
2652 .ok()
2653 .map(|s| PathBuf::from(s.trim()))
2654 })
2655 .unwrap_or_else(|| {
2656 self.loop_context
2657 .as_ref()
2658 .map(|ctx| ctx.events_path())
2659 .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2660 });
2661
2662 match robot_service.wait_for_response(&events_path) {
2663 Ok(Some(response)) => {
2664 info!(
2665 response = %response,
2666 "Received human.response — continuing loop"
2667 );
2668 context.insert(
2669 "outcome".to_string(),
2670 Value::String("response".to_string()),
2671 );
2672 context.insert("response".to_string(), Value::String(response.clone()));
2673 response_event = Some(Event::new("human.response", &response));
2675 }
2676 Ok(None) => {
2677 warn!(
2678 timeout_secs = robot_service.timeout_secs(),
2679 "Human response timeout — injecting human.timeout event"
2680 );
2681 context.insert(
2682 "outcome".to_string(),
2683 Value::String("timeout".to_string()),
2684 );
2685 context.insert(
2686 "timeout_seconds".to_string(),
2687 Value::from(robot_service.timeout_secs()),
2688 );
2689 let timeout_event = Event::new(
2690 "human.timeout",
2691 format!(
2692 "No response after {}s. Original question: {}",
2693 robot_service.timeout_secs(),
2694 payload
2695 ),
2696 );
2697 response_event = Some(timeout_event);
2698 }
2699 Err(e) => {
2700 warn!(
2701 error = %e,
2702 "Error waiting for human response — injecting human.timeout event"
2703 );
2704 context.insert(
2705 "outcome".to_string(),
2706 Value::String("wait_error".to_string()),
2707 );
2708 context.insert("error".to_string(), Value::String(e.to_string()));
2709 let timeout_event = Event::new(
2710 "human.timeout",
2711 format!(
2712 "Error waiting for response: {}. Original question: {}",
2713 e, payload
2714 ),
2715 );
2716 response_event = Some(timeout_event);
2717 }
2718 }
2719 }
2720 } else {
2721 debug!(
2722 "human.interact event detected but no robot service active — passing through"
2723 );
2724 context.insert(
2725 "outcome".to_string(),
2726 Value::String("no_robot_service".to_string()),
2727 );
2728 }
2729
2730 human_interact_context = Some(Value::Object(context));
2731 }
2732
2733 let restart_requested = validated_events.iter().any(Self::is_restart_request_event)
2734 || response_event
2735 .as_ref()
2736 .is_some_and(Self::is_restart_request_event);
2737 if restart_requested {
2738 self.mark_restart_requested("human_text");
2739 }
2740
2741 let had_events = !validated_events.is_empty();
2743 let had_plan_events = validated_events
2744 .iter()
2745 .any(|event| event.topic.as_str().starts_with("plan."));
2746
2747 for event in validated_events {
2752 self.state.record_event(&event);
2754
2755 self.diagnostics.log_orchestration(
2756 self.state.iteration,
2757 "jsonl",
2758 crate::diagnostics::OrchestrationEvent::EventPublished {
2759 topic: event.topic.to_string(),
2760 },
2761 );
2762
2763 if !self.registry.has_subscriber(event.topic.as_str()) {
2764 has_orphans = true;
2765 }
2766
2767 debug!(
2768 topic = %event.topic,
2769 "Publishing event from JSONL"
2770 );
2771 self.bus.publish(event);
2772 }
2773
2774 if let Some(response) = response_event {
2776 self.state.record_event(&response);
2777 info!(
2778 topic = %response.topic,
2779 "Publishing human.response event from robot service"
2780 );
2781 self.bus.publish(response);
2782 }
2783
2784 Ok(ProcessedEvents {
2785 had_events,
2786 had_plan_events,
2787 human_interact_context,
2788 has_orphans,
2789 })
2790 }
2791
2792 pub fn process_events_from_jsonl_with_waves(
2798 &mut self,
2799 ) -> std::io::Result<ProcessedEventsWithWaves> {
2800 let result = self.event_reader.read_new_events()?;
2801
2802 let (wave_events, regular_events): (Vec<_>, Vec<_>) =
2810 result.events.into_iter().partition(|e| {
2811 e.wave_id.is_some()
2812 && self
2813 .registry
2814 .find_by_trigger(e.topic.as_str())
2815 .and_then(|hat_id| self.registry.get_config(hat_id))
2816 .is_some_and(|hat_config| hat_config.concurrency > 1)
2817 });
2818
2819 if !wave_events.is_empty() {
2820 debug!(
2821 wave_count = wave_events.len(),
2822 regular_count = regular_events.len(),
2823 "Partitioned wave events from regular events"
2824 );
2825 }
2826
2827 let regular_result = crate::event_reader::ParseResult {
2830 events: regular_events,
2831 malformed: result.malformed,
2832 };
2833 let processed = self.process_parse_result(regular_result)?;
2834
2835 Ok(ProcessedEventsWithWaves {
2836 processed,
2837 wave_events,
2838 })
2839 }
2840
2841 pub fn check_ralph_completion(&self, output: &str) -> bool {
2845 let events = EventParser::new().parse(output);
2846 events
2847 .iter()
2848 .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2849 }
2850
2851 pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2858 self.stop_robot_service();
2860
2861 let elapsed = self.state.elapsed();
2862 let duration_str = format_duration(elapsed);
2863
2864 let payload = format!(
2865 "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2866 reason.as_str(),
2867 termination_status_text(reason),
2868 self.state.iteration,
2869 duration_str,
2870 reason.exit_code()
2871 );
2872
2873 let event = Event::new("loop.terminate", &payload);
2874
2875 self.bus.publish(event.clone());
2877
2878 info!(
2879 reason = %reason.as_str(),
2880 iterations = self.state.iteration,
2881 duration = %duration_str,
2882 "Wrapping up: {}. {} iterations in {}.",
2883 reason.as_str(),
2884 self.state.iteration,
2885 duration_str
2886 );
2887
2888 event
2889 }
2890
2891 pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2896 self.robot_service.as_ref().map(|s| s.shutdown_flag())
2897 }
2898
2899 fn stop_robot_service(&mut self) {
2903 if let Some(service) = self.robot_service.take() {
2904 service.stop();
2905 }
2906 }
2907
2908 pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2916 events
2917 .iter()
2918 .find(|e| e.topic.as_str() == "user.prompt")
2919 .map(|e| UserPrompt {
2920 id: Self::extract_prompt_id(&e.payload),
2921 text: e.payload.clone(),
2922 })
2923 }
2924
2925 fn extract_prompt_id(payload: &str) -> String {
2930 if let Some(start) = payload.find("id=\"")
2932 && let Some(end) = payload[start + 4..].find('"')
2933 {
2934 return payload[start + 4..start + 4 + end].to_string();
2935 }
2936
2937 format!("q{}", Self::generate_prompt_id())
2939 }
2940
2941 fn generate_prompt_id() -> String {
2944 use std::time::{SystemTime, UNIX_EPOCH};
2945 let nanos = SystemTime::now()
2946 .duration_since(UNIX_EPOCH)
2947 .unwrap()
2948 .as_nanos();
2949 format!("{:x}", nanos % 0xFFFF_FFFF)
2950 }
2951}
2952
2953#[derive(Debug, Clone)]
2957pub struct UserPrompt {
2958 pub id: String,
2960 pub text: String,
2962}
2963
2964fn format_duration(d: Duration) -> String {
2966 let total_secs = d.as_secs();
2967 let hours = total_secs / 3600;
2968 let minutes = (total_secs % 3600) / 60;
2969 let seconds = total_secs % 60;
2970
2971 if hours > 0 {
2972 format!("{}h {}m {}s", hours, minutes, seconds)
2973 } else if minutes > 0 {
2974 format!("{}m {}s", minutes, seconds)
2975 } else {
2976 format!("{}s", seconds)
2977 }
2978}
2979
2980fn termination_status_text(reason: &TerminationReason) -> &'static str {
2982 match reason {
2983 TerminationReason::CompletionPromise => "All tasks completed successfully.",
2984 TerminationReason::MaxIterations => "Stopped at iteration limit.",
2985 TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2986 TerminationReason::MaxCost => "Stopped at cost limit.",
2987 TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2988 TerminationReason::LoopThrashing => {
2989 "Loop thrashing detected - same hat repeatedly blocked."
2990 }
2991 TerminationReason::LoopStale => {
2992 "Stale loop detected - same topic emitted 3+ times consecutively."
2993 }
2994 TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2995 TerminationReason::Stopped => "Manually stopped.",
2996 TerminationReason::Interrupted => "Interrupted by signal.",
2997 TerminationReason::RestartRequested => "Restarting by human request.",
2998 TerminationReason::WorkspaceGone => "Workspace directory removed externally.",
2999 TerminationReason::Cancelled => "Cancelled gracefully (human rejection or timeout).",
3000 }
3001}