1mod loop_state;
6#[cfg(test)]
7mod tests;
8
9pub use loop_state::LoopState;
10
11use crate::config::{HatBackend, InjectMode, RalphConfig};
12use crate::event_parser::{EventParser, MutationEvidence, MutationStatus};
13use crate::event_reader::EventReader;
14use crate::hat_registry::HatRegistry;
15use crate::hatless_ralph::HatlessRalph;
16use crate::instructions::InstructionBuilder;
17use crate::loop_context::LoopContext;
18use crate::memory_store::{MarkdownMemoryStore, format_memories_as_markdown, truncate_to_budget};
19use crate::skill_registry::SkillRegistry;
20use crate::text::floor_char_boundary;
21use ralph_proto::{CheckinContext, Event, EventBus, Hat, HatId, RobotService};
22use serde_json::{Map, Value};
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use std::time::Duration;
27use tracing::{debug, info, warn};
28
29#[derive(Debug, Clone)]
31pub struct ProcessedEvents {
32 pub had_events: bool,
34 pub had_plan_events: bool,
36 pub human_interact_context: Option<Value>,
39 pub has_orphans: bool,
41}
42
43#[derive(Debug)]
45pub struct ProcessedEventsWithWaves {
46 pub processed: ProcessedEvents,
48 pub wave_events: Vec<crate::event_reader::Event>,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum TerminationReason {
55 CompletionPromise,
57 MaxIterations,
59 MaxRuntime,
61 MaxCost,
63 ConsecutiveFailures,
65 LoopThrashing,
67 LoopStale,
69 ValidationFailure,
71 Stopped,
73 Interrupted,
75 RestartRequested,
77 WorkspaceGone,
79 Cancelled,
81}
82
83impl TerminationReason {
84 pub fn exit_code(&self) -> i32 {
92 match self {
93 TerminationReason::CompletionPromise => 0,
94 TerminationReason::ConsecutiveFailures
95 | TerminationReason::LoopThrashing
96 | TerminationReason::LoopStale
97 | TerminationReason::ValidationFailure
98 | TerminationReason::Stopped
99 | TerminationReason::WorkspaceGone => 1,
100 TerminationReason::MaxIterations
101 | TerminationReason::MaxRuntime
102 | TerminationReason::MaxCost => 2,
103 TerminationReason::Interrupted => 130,
104 TerminationReason::RestartRequested => 3,
106 TerminationReason::Cancelled => 0,
108 }
109 }
110
111 pub fn as_str(&self) -> &'static str {
116 match self {
117 TerminationReason::CompletionPromise => "completed",
118 TerminationReason::MaxIterations => "max_iterations",
119 TerminationReason::MaxRuntime => "max_runtime",
120 TerminationReason::MaxCost => "max_cost",
121 TerminationReason::ConsecutiveFailures => "consecutive_failures",
122 TerminationReason::LoopThrashing => "loop_thrashing",
123 TerminationReason::LoopStale => "loop_stale",
124 TerminationReason::ValidationFailure => "validation_failure",
125 TerminationReason::Stopped => "stopped",
126 TerminationReason::Interrupted => "interrupted",
127 TerminationReason::RestartRequested => "restart_requested",
128 TerminationReason::WorkspaceGone => "workspace_gone",
129 TerminationReason::Cancelled => "cancelled",
130 }
131 }
132
133 pub fn is_success(&self) -> bool {
135 matches!(self, TerminationReason::CompletionPromise)
136 }
137}
138
139pub struct EventLoop {
141 config: RalphConfig,
142 registry: HatRegistry,
143 bus: EventBus,
144 state: LoopState,
145 instruction_builder: InstructionBuilder,
146 ralph: HatlessRalph,
147 robot_guidance: Vec<String>,
149 pub(crate) event_reader: EventReader,
152 diagnostics: crate::diagnostics::DiagnosticsCollector,
153 loop_context: Option<LoopContext>,
155 skill_registry: SkillRegistry,
157 robot_service: Option<Box<dyn RobotService>>,
160}
161
162impl EventLoop {
163 pub fn new(config: RalphConfig) -> Self {
165 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(std::path::Path::new("."))
168 .unwrap_or_else(|e| {
169 debug!(
170 "Failed to initialize diagnostics: {}, using disabled collector",
171 e
172 );
173 crate::diagnostics::DiagnosticsCollector::disabled()
174 });
175
176 Self::with_diagnostics(config, diagnostics)
177 }
178
179 pub fn with_context(config: RalphConfig, context: LoopContext) -> Self {
185 let diagnostics = crate::diagnostics::DiagnosticsCollector::new(context.workspace())
186 .unwrap_or_else(|e| {
187 debug!(
188 "Failed to initialize diagnostics: {}, using disabled collector",
189 e
190 );
191 crate::diagnostics::DiagnosticsCollector::disabled()
192 });
193
194 Self::with_context_and_diagnostics(config, context, diagnostics)
195 }
196
197 pub fn with_context_and_diagnostics(
199 config: RalphConfig,
200 context: LoopContext,
201 diagnostics: crate::diagnostics::DiagnosticsCollector,
202 ) -> Self {
203 let registry = HatRegistry::from_config(&config);
204 let instruction_builder =
205 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
206
207 let mut bus = EventBus::new();
208
209 for hat in registry.all() {
213 bus.register(hat.clone());
214 }
215
216 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
220
221 if registry.is_empty() {
222 debug!("Solo mode: Ralph is the only coordinator");
223 } else {
224 debug!(
225 "Multi-hat mode: {} custom hats + Ralph as fallback",
226 registry.len()
227 );
228 }
229
230 let mut skill_registry = if config.skills.enabled {
232 SkillRegistry::from_config(
233 &config.skills,
234 context.workspace(),
235 Some(config.cli.backend.as_str()),
236 )
237 .unwrap_or_else(|e| {
238 warn!(
239 "Failed to build skill registry: {}, using empty registry",
240 e
241 );
242 SkillRegistry::new(Some(config.cli.backend.as_str()))
243 })
244 } else {
245 SkillRegistry::new(Some(config.cli.backend.as_str()))
246 };
247
248 if !config.tasks.enabled {
250 skill_registry.remove("ralph-tools-tasks");
251 }
252 if !config.memories.enabled {
253 skill_registry.remove("ralph-tools-memories");
254 }
255
256 let skill_index = if config.skills.enabled {
257 skill_registry.build_index(None)
258 } else {
259 String::new()
260 };
261
262 let ralph = HatlessRalph::new(
264 config.event_loop.completion_promise.clone(),
265 config.core.clone(),
266 ®istry,
267 config.event_loop.starting_event.clone(),
268 )
269 .with_memories_enabled(config.memories.enabled)
270 .with_skill_index(skill_index);
271
272 let events_path = std::fs::read_to_string(context.current_events_marker())
276 .map(|s| {
277 let relative = s.trim();
278 context.workspace().join(relative)
279 })
280 .unwrap_or_else(|_| context.events_path());
281 let event_reader = EventReader::new(&events_path);
282
283 Self {
284 config,
285 registry,
286 bus,
287 state: LoopState::new(),
288 instruction_builder,
289 ralph,
290 robot_guidance: Vec::new(),
291 event_reader,
292 diagnostics,
293 loop_context: Some(context),
294 skill_registry,
295 robot_service: None,
296 }
297 }
298
299 pub fn with_diagnostics(
301 config: RalphConfig,
302 diagnostics: crate::diagnostics::DiagnosticsCollector,
303 ) -> Self {
304 let registry = HatRegistry::from_config(&config);
305 let instruction_builder =
306 InstructionBuilder::with_events(config.core.clone(), config.events.clone());
307
308 let mut bus = EventBus::new();
309
310 for hat in registry.all() {
314 bus.register(hat.clone());
315 }
316
317 let ralph_hat = ralph_proto::Hat::new("ralph", "Ralph").subscribe("*"); bus.register(ralph_hat);
321
322 if registry.is_empty() {
323 debug!("Solo mode: Ralph is the only coordinator");
324 } else {
325 debug!(
326 "Multi-hat mode: {} custom hats + Ralph as fallback",
327 registry.len()
328 );
329 }
330
331 let workspace_root = std::path::Path::new(".");
333 let mut skill_registry = if config.skills.enabled {
334 SkillRegistry::from_config(
335 &config.skills,
336 workspace_root,
337 Some(config.cli.backend.as_str()),
338 )
339 .unwrap_or_else(|e| {
340 warn!(
341 "Failed to build skill registry: {}, using empty registry",
342 e
343 );
344 SkillRegistry::new(Some(config.cli.backend.as_str()))
345 })
346 } else {
347 SkillRegistry::new(Some(config.cli.backend.as_str()))
348 };
349
350 if !config.tasks.enabled {
352 skill_registry.remove("ralph-tools-tasks");
353 }
354 if !config.memories.enabled {
355 skill_registry.remove("ralph-tools-memories");
356 }
357
358 let skill_index = if config.skills.enabled {
359 skill_registry.build_index(None)
360 } else {
361 String::new()
362 };
363
364 let ralph = HatlessRalph::new(
366 config.event_loop.completion_promise.clone(),
367 config.core.clone(),
368 ®istry,
369 config.event_loop.starting_event.clone(),
370 )
371 .with_memories_enabled(config.memories.enabled)
372 .with_skill_index(skill_index);
373
374 let events_path = std::fs::read_to_string(".ralph/current-events")
377 .map(|s| s.trim().to_string())
378 .unwrap_or_else(|_| ".ralph/events.jsonl".to_string());
379 let event_reader = EventReader::new(&events_path);
380
381 Self {
382 config,
383 registry,
384 bus,
385 state: LoopState::new(),
386 instruction_builder,
387 ralph,
388 robot_guidance: Vec::new(),
389 event_reader,
390 diagnostics,
391 loop_context: None,
392 skill_registry,
393 robot_service: None,
394 }
395 }
396
397 pub fn set_robot_service(&mut self, service: Box<dyn RobotService>) {
405 self.robot_service = Some(service);
406 }
407
408 pub fn loop_context(&self) -> Option<&LoopContext> {
410 self.loop_context.as_ref()
411 }
412
413 fn tasks_path(&self) -> PathBuf {
415 self.loop_context
416 .as_ref()
417 .map(|ctx| ctx.tasks_path())
418 .unwrap_or_else(|| PathBuf::from(".ralph/agent/tasks.jsonl"))
419 }
420
421 fn scratchpad_path(&self) -> PathBuf {
427 if self.config.core.scratchpad != ".ralph/agent/scratchpad.md" {
428 return PathBuf::from(&self.config.core.scratchpad);
429 }
430 self.loop_context
431 .as_ref()
432 .map(|ctx| ctx.scratchpad_path())
433 .unwrap_or_else(|| PathBuf::from(&self.config.core.scratchpad))
434 }
435
436 pub fn state(&self) -> &LoopState {
438 &self.state
439 }
440
441 pub fn reset_stale_topic_counter(&mut self) {
447 self.state.consecutive_same_signature = 0;
448 self.state.last_emitted_signature = None;
449 }
450
451 pub fn config(&self) -> &RalphConfig {
453 &self.config
454 }
455
456 pub fn registry(&self) -> &HatRegistry {
458 &self.registry
459 }
460
461 pub fn log_hook_run_telemetry(&self, entry: crate::diagnostics::HookRunTelemetryEntry) {
463 self.diagnostics.log_hook_run(entry);
464 }
465
466 pub fn log_prompt(&self, iteration: u32, hat: &str, prompt: &str) {
468 self.diagnostics.log_prompt(iteration, hat, prompt);
469 }
470
471 pub fn get_hat_backend(&self, hat_id: &HatId) -> Option<&HatBackend> {
476 self.registry
477 .get_config(hat_id)
478 .and_then(|config| config.backend.as_ref())
479 }
480
481 pub fn add_observer<F>(&mut self, observer: F)
486 where
487 F: Fn(&Event) + Send + 'static,
488 {
489 self.bus.add_observer(observer);
490 }
491
492 #[deprecated(since = "2.0.0", note = "Use add_observer instead")]
496 pub fn set_observer<F>(&mut self, observer: F)
497 where
498 F: Fn(&Event) + Send + 'static,
499 {
500 #[allow(deprecated)]
501 self.bus.set_observer(observer);
502 }
503
504 pub fn check_termination(&self) -> Option<TerminationReason> {
506 let cfg = &self.config.event_loop;
507
508 if self.state.iteration >= cfg.max_iterations {
509 return Some(TerminationReason::MaxIterations);
510 }
511
512 if self.state.elapsed().as_secs() >= cfg.max_runtime_seconds {
513 return Some(TerminationReason::MaxRuntime);
514 }
515
516 if let Some(max_cost) = cfg.max_cost_usd
517 && self.state.cumulative_cost >= max_cost
518 {
519 return Some(TerminationReason::MaxCost);
520 }
521
522 if self.state.consecutive_failures >= cfg.max_consecutive_failures {
523 return Some(TerminationReason::ConsecutiveFailures);
524 }
525
526 if self.state.abandoned_task_redispatches >= 3 {
528 return Some(TerminationReason::LoopThrashing);
529 }
530
531 if self.state.consecutive_malformed_events >= 3 {
533 return Some(TerminationReason::ValidationFailure);
534 }
535
536 if self.state.consecutive_same_signature >= 3 {
538 let topic = self
539 .state
540 .last_emitted_signature
541 .as_ref()
542 .map(|signature| signature.topic.as_str())
543 .unwrap_or("?");
544 warn!(
545 topic,
546 count = self.state.consecutive_same_signature,
547 "Stale loop detected: same event signature emitted consecutively"
548 );
549 return Some(TerminationReason::LoopStale);
550 }
551
552 let stop_path =
554 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/stop-requested");
555 if stop_path.exists() {
556 let _ = std::fs::remove_file(&stop_path);
557 return Some(TerminationReason::Stopped);
558 }
559
560 let restart_path =
562 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
563 if restart_path.exists() {
564 return Some(TerminationReason::RestartRequested);
565 }
566
567 if !std::path::Path::new(&self.config.core.workspace_root).is_dir() {
569 return Some(TerminationReason::WorkspaceGone);
570 }
571
572 None
573 }
574
575 pub fn check_cancellation_event(&mut self) -> Option<TerminationReason> {
580 if !self.state.cancellation_requested {
581 return None;
582 }
583 self.state.cancellation_requested = false;
584 info!("Loop cancelled gracefully via loop.cancel event");
585
586 self.diagnostics.log_orchestration(
587 self.state.iteration,
588 "loop",
589 crate::diagnostics::OrchestrationEvent::LoopTerminated {
590 reason: "cancelled".to_string(),
591 },
592 );
593
594 Some(TerminationReason::Cancelled)
595 }
596
597 pub fn check_completion_event(&mut self) -> Option<TerminationReason> {
601 if !self.state.completion_requested {
602 return None;
603 }
604
605 let required = &self.config.event_loop.required_events;
607 if !required.is_empty() {
608 let missing = self.state.missing_required_events(required);
609 if !missing.is_empty() {
610 warn!(
611 missing = ?missing,
612 "Rejecting LOOP_COMPLETE: required events not seen during loop lifetime"
613 );
614 self.state.completion_requested = false;
615
616 let resume_payload = format!(
618 "LOOP_COMPLETE rejected: missing required events: {:?}. \
619 The agent must complete all workflow phases before emitting LOOP_COMPLETE. \
620 Use loop.cancel to abort the workflow instead.",
621 missing
622 );
623 self.bus.publish(Event::new("task.resume", resume_payload));
624 return None;
625 }
626 }
627
628 self.state.completion_requested = false;
629
630 if self.config.event_loop.persistent {
632 info!("Completion event suppressed - persistent mode active, loop staying alive");
633
634 self.diagnostics.log_orchestration(
635 self.state.iteration,
636 "loop",
637 crate::diagnostics::OrchestrationEvent::LoopTerminated {
638 reason: "completion_event_suppressed_persistent".to_string(),
639 },
640 );
641
642 let resume_event = Event::new(
644 "task.resume",
645 "Persistent mode: loop staying alive after completion signal. \
646 Check for new tasks or await human guidance.",
647 );
648 self.bus.publish(resume_event);
649
650 return None;
651 }
652
653 if self.config.memories.enabled {
655 if let Ok(false) = self.verify_tasks_complete() {
656 let open_tasks = self.get_open_task_list();
657 warn!(
658 open_tasks = ?open_tasks,
659 "Rejecting completion event with {} open task(s)",
660 open_tasks.len()
661 );
662 self.bus.publish(Event::new(
663 "task.resume",
664 format!(
665 "Completion rejected: runtime tasks remain open: {:?}. Close, fail, or reopen outstanding tasks before emitting the completion promise.",
666 open_tasks
667 ),
668 ));
669 return None;
670 }
671 } else if let Ok(false) = self.verify_scratchpad_complete() {
672 warn!("Completion event with pending scratchpad tasks - trusting agent decision");
673 }
674
675 info!("Completion event detected - terminating");
676
677 self.diagnostics.log_orchestration(
679 self.state.iteration,
680 "loop",
681 crate::diagnostics::OrchestrationEvent::LoopTerminated {
682 reason: "completion_event".to_string(),
683 },
684 );
685
686 Some(TerminationReason::CompletionPromise)
687 }
688
689 pub fn initialize(&mut self, prompt_content: &str) {
691 let topic = self
693 .config
694 .event_loop
695 .starting_event
696 .clone()
697 .unwrap_or_else(|| "task.start".to_string());
698 self.initialize_with_topic(&topic, prompt_content);
699 }
700
701 pub fn initialize_resume(&mut self, prompt_content: &str) {
706 self.initialize_with_topic("task.resume", prompt_content);
708 }
709
710 fn initialize_with_topic(&mut self, topic: &str, prompt_content: &str) {
712 self.ralph.set_objective(prompt_content.to_string());
716
717 let start_event = Event::new(topic, prompt_content);
718 self.bus.publish(start_event);
719 debug!(topic = topic, "Published {} event", topic);
720 }
721
722 pub fn next_hat(&self) -> Option<&HatId> {
731 let next = self.bus.next_hat_with_pending();
732
733 if next.is_none() && self.bus.has_human_pending() {
735 return self.bus.hat_ids().find(|id| id.as_str() == "ralph");
736 }
737
738 next.as_ref()?;
740
741 if self.registry.is_empty() {
744 next
746 } else {
747 self.bus.hat_ids().find(|id| id.as_str() == "ralph")
750 }
751 }
752
753 pub fn sync_event_reader_to_file_end(&mut self) {
760 let path = self.event_reader.path();
761 if let Ok(metadata) = std::fs::metadata(path) {
762 self.event_reader.set_position(metadata.len());
763 }
764 }
765
766 pub fn has_pending_events(&self) -> bool {
771 self.bus.next_hat_with_pending().is_some() || self.bus.has_human_pending()
772 }
773
774 pub fn has_pending_human_events(&self) -> bool {
779 self.bus.has_human_pending()
780 }
781
782 pub fn inject_human_guidance<I, S>(&mut self, messages: I)
787 where
788 I: IntoIterator<Item = S>,
789 S: Into<String>,
790 {
791 for message in messages {
792 let event = Event::new("human.guidance", message.into());
793 self.state.record_event(&event);
794 self.bus.publish(event);
795 }
796 }
797
798 pub fn has_pending_plan_events_in_jsonl(&self) -> std::io::Result<bool> {
803 let result = self.event_reader.peek_new_events()?;
804 Ok(result
805 .events
806 .iter()
807 .any(|event| event.topic.starts_with("plan.")))
808 }
809
810 pub fn pending_human_interact_context_in_jsonl(&self) -> std::io::Result<Option<Value>> {
813 let result = self.event_reader.peek_new_events()?;
814 Ok(result
815 .events
816 .iter()
817 .find(|event| event.topic == "human.interact")
818 .map(|event| {
819 Self::parse_human_interact_context(event.payload.as_deref().unwrap_or_default())
820 }))
821 }
822
823 pub fn get_hat_publishes(&self, hat_id: &HatId) -> Vec<String> {
827 self.registry
828 .get(hat_id)
829 .map(|hat| hat.publishes.iter().map(|t| t.to_string()).collect())
830 .unwrap_or_default()
831 }
832
833 pub fn inject_fallback_event(&mut self) -> bool {
840 let fallback_event = match &self.state.last_hat {
843 Some(hat_id) if hat_id.as_str() != "ralph" => {
844 let publishes = self.get_hat_publishes(hat_id);
845 let payload = if publishes.is_empty() {
846 format!(
847 "RECOVERY: Previous iteration by hat `{}` did not publish an event. \
848 Emit exactly one valid next event via `ralph emit`, or stop only after \
849 publishing the configured completion event.",
850 hat_id.as_str()
851 )
852 } else {
853 format!(
854 "RECOVERY: Previous iteration by hat `{}` did not publish an event. \
855 This failed because no event was emitted. Emit exactly ONE valid next \
856 event via `ralph emit`. Allowed topics: `{}`. Do not only write prose \
857 or update files. Stop immediately after emitting.",
858 hat_id.as_str(),
859 publishes.join("`, `")
860 )
861 };
862
863 debug!(
864 hat = %hat_id.as_str(),
865 "Injecting fallback event to recover - targeting last hat with task.resume"
866 );
867 Event::new("task.resume", payload).with_target(hat_id.clone())
868 }
869 _ => {
870 debug!("Injecting fallback event to recover - triggering Ralph with task.resume");
871 Event::new(
872 "task.resume",
873 "RECOVERY: Previous iteration did not publish an event. \
874 Review the scratchpad and either dispatch the next task or complete the loop.",
875 )
876 }
877 };
878
879 self.bus.publish(fallback_event);
880 true
881 }
882
883 pub fn build_prompt(&mut self, hat_id: &HatId) -> Option<String> {
897 if hat_id.as_str() == "ralph" {
900 if self.registry.is_empty() {
901 let mut events = self.bus.take_pending(&hat_id.clone());
903 let mut human_events = self.bus.take_human_pending();
904 events.append(&mut human_events);
905
906 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = events
908 .into_iter()
909 .partition(|e| e.topic.as_str() == "human.guidance");
910
911 let events_context = regular_events
912 .iter()
913 .map(|e| Self::format_event(e))
914 .collect::<Vec<_>>()
915 .join("\n");
916
917 self.update_robot_guidance(guidance_events);
919 self.apply_robot_guidance();
920
921 let base_prompt = self.ralph.build_prompt(&events_context, &[]);
923 self.ralph.clear_robot_guidance();
924 let with_skills = self.prepend_auto_inject_skills(base_prompt);
925 let with_scratchpad = self.prepend_scratchpad(with_skills);
926 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
927
928 debug!("build_prompt: routing to HatlessRalph (solo mode)");
929 return Some(final_prompt);
930 } else {
931 let mut all_hat_ids: Vec<HatId> = self.bus.hat_ids().cloned().collect();
933 all_hat_ids.sort_by(|a, b| a.as_str().cmp(b.as_str()));
935
936 let mut all_events = Vec::new();
937 let mut system_events = Vec::new();
938
939 for id in &all_hat_ids {
940 let pending = self.bus.take_pending(id);
941 if pending.is_empty() {
942 continue;
943 }
944
945 let (drop_pending, exhausted_event) = self.check_hat_exhaustion(id, &pending);
946 if drop_pending {
947 if let Some(exhausted_event) = exhausted_event {
949 all_events.push(exhausted_event.clone());
950 system_events.push(exhausted_event);
951 }
952 continue;
953 }
954
955 all_events.extend(pending);
956 }
957
958 let mut human_events = self.bus.take_human_pending();
959 all_events.append(&mut human_events);
960
961 for event in system_events {
964 self.bus.publish(event);
965 }
966
967 let (guidance_events, regular_events): (Vec<_>, Vec<_>) = all_events
969 .into_iter()
970 .partition(|e| e.topic.as_str() == "human.guidance");
971
972 self.update_robot_guidance(guidance_events);
975 self.apply_robot_guidance();
976
977 let effective_regular_events = self.effective_regular_events(®ular_events);
979
980 let active_hat_ids = self.determine_active_hat_ids(®ular_events);
982 self.record_hat_activations(&active_hat_ids);
983 self.state.last_active_hat_ids = active_hat_ids.clone();
984 let active_hats = self.determine_active_hats(®ular_events);
985
986 let events_context = effective_regular_events
988 .iter()
989 .map(|e| Self::format_event(e))
990 .collect::<Vec<_>>()
991 .join("\n");
992
993 let base_prompt = self.ralph.build_prompt(&events_context, &active_hats);
995
996 debug!(
998 "build_prompt: routing to HatlessRalph (multi-hat coordinator mode), active_hats: {:?}",
999 active_hats
1000 .iter()
1001 .map(|h| h.id.as_str())
1002 .collect::<Vec<_>>()
1003 );
1004
1005 self.ralph.clear_robot_guidance();
1007 let with_skills = self.prepend_auto_inject_skills(base_prompt);
1008 let with_scratchpad = self.prepend_scratchpad(with_skills);
1009 let final_prompt = self.prepend_ready_tasks(with_scratchpad);
1010
1011 return Some(final_prompt);
1012 }
1013 }
1014
1015 let events = self.bus.take_pending(&hat_id.clone());
1019 let events_context = events
1020 .iter()
1021 .map(|e| Self::format_event(e))
1022 .collect::<Vec<_>>()
1023 .join("\n");
1024
1025 let hat = self.registry.get(hat_id)?;
1026
1027 debug!(
1029 "build_prompt: hat_id='{}', instructions.is_empty()={}",
1030 hat_id.as_str(),
1031 hat.instructions.is_empty()
1032 );
1033
1034 debug!(
1036 "build_prompt: routing to build_custom_hat() for '{}'",
1037 hat_id.as_str()
1038 );
1039 Some(
1040 self.instruction_builder
1041 .build_custom_hat(hat, &events_context),
1042 )
1043 }
1044
1045 fn update_robot_guidance(&mut self, guidance_events: Vec<Event>) {
1051 if guidance_events.is_empty() {
1052 return;
1053 }
1054
1055 self.persist_guidance_to_scratchpad(&guidance_events);
1057
1058 self.robot_guidance
1059 .extend(guidance_events.into_iter().map(|e| e.payload));
1060 }
1061
1062 fn persist_guidance_to_scratchpad(&self, guidance_events: &[Event]) {
1067 use std::io::Write;
1068
1069 let scratchpad_path = self.scratchpad_path();
1070 let resolved_path = if scratchpad_path.is_relative() {
1071 self.config.core.workspace_root.join(&scratchpad_path)
1072 } else {
1073 scratchpad_path
1074 };
1075
1076 if let Some(parent) = resolved_path.parent()
1078 && !parent.exists()
1079 && let Err(e) = std::fs::create_dir_all(parent)
1080 {
1081 warn!("Failed to create scratchpad directory: {}", e);
1082 return;
1083 }
1084
1085 let mut file = match std::fs::OpenOptions::new()
1086 .create(true)
1087 .append(true)
1088 .open(&resolved_path)
1089 {
1090 Ok(f) => f,
1091 Err(e) => {
1092 warn!("Failed to open scratchpad for guidance persistence: {}", e);
1093 return;
1094 }
1095 };
1096
1097 let timestamp = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC");
1098 for event in guidance_events {
1099 let entry = format!(
1100 "\n### HUMAN GUIDANCE ({})\n\n{}\n",
1101 timestamp, event.payload
1102 );
1103 if let Err(e) = file.write_all(entry.as_bytes()) {
1104 warn!("Failed to write guidance to scratchpad: {}", e);
1105 }
1106 }
1107
1108 info!(
1109 count = guidance_events.len(),
1110 "Persisted human guidance to scratchpad"
1111 );
1112 }
1113
1114 fn apply_robot_guidance(&mut self) {
1116 if self.robot_guidance.is_empty() {
1117 return;
1118 }
1119
1120 self.ralph.set_robot_guidance(self.robot_guidance.clone());
1121 }
1122
1123 fn prepend_auto_inject_skills(&self, prompt: String) -> String {
1133 let mut prefix = String::new();
1134
1135 self.inject_memories_and_tools_skill(&mut prefix);
1137
1138 self.inject_robot_skill(&mut prefix);
1140
1141 self.inject_custom_auto_skills(&mut prefix);
1143
1144 if prefix.is_empty() {
1145 return prompt;
1146 }
1147
1148 prefix.push_str("\n\n");
1149 prefix.push_str(&prompt);
1150 prefix
1151 }
1152
1153 fn inject_memories_and_tools_skill(&self, prefix: &mut String) {
1161 let memories_config = &self.config.memories;
1162
1163 if memories_config.enabled && memories_config.inject == InjectMode::Auto {
1165 info!(
1166 "Memory injection check: enabled={}, inject={:?}, workspace_root={:?}",
1167 memories_config.enabled, memories_config.inject, self.config.core.workspace_root
1168 );
1169
1170 let workspace_root = &self.config.core.workspace_root;
1171 let store = MarkdownMemoryStore::with_default_path(workspace_root);
1172 let memories_path = workspace_root.join(".ralph/agent/memories.md");
1173
1174 info!(
1175 "Looking for memories at: {:?} (exists: {})",
1176 memories_path,
1177 memories_path.exists()
1178 );
1179
1180 let memories = match store.load() {
1181 Ok(memories) => {
1182 info!("Successfully loaded {} memories from store", memories.len());
1183 memories
1184 }
1185 Err(e) => {
1186 info!(
1187 "Failed to load memories for injection: {} (path: {:?})",
1188 e, memories_path
1189 );
1190 Vec::new()
1191 }
1192 };
1193
1194 if memories.is_empty() {
1195 info!("Memory store is empty - no memories to inject");
1196 } else {
1197 let mut memories_content = format_memories_as_markdown(&memories);
1198
1199 if memories_config.budget > 0 {
1200 let original_len = memories_content.len();
1201 memories_content =
1202 truncate_to_budget(&memories_content, memories_config.budget);
1203 debug!(
1204 "Applied budget: {} chars -> {} chars (budget: {})",
1205 original_len,
1206 memories_content.len(),
1207 memories_config.budget
1208 );
1209 }
1210
1211 info!(
1212 "Injecting {} memories ({} chars) into prompt",
1213 memories.len(),
1214 memories_content.len()
1215 );
1216
1217 prefix.push_str(&memories_content);
1218 }
1219 }
1220
1221 let tasks_enabled = self.config.tasks.enabled;
1223
1224 if (memories_config.enabled || tasks_enabled)
1226 && let Some(skill) = self.skill_registry.get("ralph-tools")
1227 {
1228 if !prefix.is_empty() {
1229 prefix.push_str("\n\n");
1230 }
1231 prefix.push_str(&format!(
1232 "<ralph-tools-skill>\n{}\n</ralph-tools-skill>",
1233 skill.content.trim()
1234 ));
1235 debug!("Injected ralph-tools skill from registry");
1236 }
1237
1238 if tasks_enabled && let Some(skill) = self.skill_registry.get("ralph-tools-tasks") {
1240 if !prefix.is_empty() {
1241 prefix.push_str("\n\n");
1242 }
1243 prefix.push_str(&format!(
1244 "<ralph-tools-tasks-skill>\n{}\n</ralph-tools-tasks-skill>",
1245 skill.content.trim()
1246 ));
1247 debug!("Injected ralph-tools-tasks skill from registry");
1248 }
1249
1250 if memories_config.enabled
1252 && let Some(skill) = self.skill_registry.get("ralph-tools-memories")
1253 {
1254 if !prefix.is_empty() {
1255 prefix.push_str("\n\n");
1256 }
1257 prefix.push_str(&format!(
1258 "<ralph-tools-memories-skill>\n{}\n</ralph-tools-memories-skill>",
1259 skill.content.trim()
1260 ));
1261 debug!("Injected ralph-tools-memories skill from registry");
1262 }
1263 }
1264
1265 fn inject_robot_skill(&self, prefix: &mut String) {
1270 if !self.config.robot.enabled {
1271 return;
1272 }
1273
1274 if let Some(skill) = self.skill_registry.get("robot-interaction") {
1275 if !prefix.is_empty() {
1276 prefix.push_str("\n\n");
1277 }
1278 prefix.push_str(&format!(
1279 "<robot-skill>\n{}\n</robot-skill>",
1280 skill.content.trim()
1281 ));
1282 debug!("Injected robot interaction skill from registry");
1283 }
1284 }
1285
1286 fn inject_custom_auto_skills(&self, prefix: &mut String) {
1288 for skill in self.skill_registry.auto_inject_skills(None) {
1289 if matches!(
1291 skill.name.as_str(),
1292 "ralph-tools" | "ralph-tools-tasks" | "ralph-tools-memories" | "robot-interaction"
1293 ) {
1294 continue;
1295 }
1296
1297 if !prefix.is_empty() {
1298 prefix.push_str("\n\n");
1299 }
1300 prefix.push_str(&format!(
1301 "<{name}-skill>\n{content}\n</{name}-skill>",
1302 name = skill.name,
1303 content = skill.content.trim()
1304 ));
1305 debug!("Injected auto-inject skill: {}", skill.name);
1306 }
1307 }
1308
1309 fn prepend_scratchpad(&self, prompt: String) -> String {
1315 let scratchpad_path = self.scratchpad_path();
1316
1317 let resolved_path = if scratchpad_path.is_relative() {
1318 self.config.core.workspace_root.join(&scratchpad_path)
1319 } else {
1320 scratchpad_path
1321 };
1322
1323 if !resolved_path.exists() {
1324 debug!(
1325 "Scratchpad not found at {:?}, skipping injection",
1326 resolved_path
1327 );
1328 return prompt;
1329 }
1330
1331 let content = match std::fs::read_to_string(&resolved_path) {
1332 Ok(c) => c,
1333 Err(e) => {
1334 info!("Failed to read scratchpad for injection: {}", e);
1335 return prompt;
1336 }
1337 };
1338
1339 if content.trim().is_empty() {
1340 debug!("Scratchpad is empty, skipping injection");
1341 return prompt;
1342 }
1343
1344 let char_budget = 4000 * 4;
1346 let content = if content.len() > char_budget {
1347 let start = content.len() - char_budget;
1349 let start = floor_char_boundary(&content, start);
1351 let line_start = content[start..].find('\n').map_or(start, |n| start + n + 1);
1352 let discarded = &content[..line_start];
1353
1354 let headings: Vec<&str> = discarded
1356 .lines()
1357 .filter(|line| line.starts_with('#'))
1358 .collect();
1359 let summary = if headings.is_empty() {
1360 format!(
1361 "<!-- earlier content truncated ({} chars omitted) -->",
1362 line_start
1363 )
1364 } else {
1365 format!(
1366 "<!-- earlier content truncated ({} chars omitted) -->\n\
1367 <!-- discarded sections: {} -->",
1368 line_start,
1369 headings.join(" | ")
1370 )
1371 };
1372
1373 format!("{}\n\n{}", summary, &content[line_start..])
1374 } else {
1375 content
1376 };
1377
1378 info!("Injecting scratchpad ({} chars) into prompt", content.len());
1379
1380 let mut final_prompt = format!(
1381 "<scratchpad path=\"{}\">\n{}\n</scratchpad>\n\n",
1382 self.config.core.scratchpad, content
1383 );
1384 final_prompt.push_str(&prompt);
1385 final_prompt
1386 }
1387
1388 fn prepend_ready_tasks(&self, prompt: String) -> String {
1394 if !self.config.tasks.enabled {
1395 return prompt;
1396 }
1397
1398 use crate::task::TaskStatus;
1399 use crate::task_store::TaskStore;
1400
1401 let tasks_path = self.tasks_path();
1402 let resolved_path = if tasks_path.is_relative() {
1403 self.config.core.workspace_root.join(&tasks_path)
1404 } else {
1405 tasks_path
1406 };
1407
1408 if !resolved_path.exists() {
1409 return prompt;
1410 }
1411
1412 let store = match TaskStore::load(&resolved_path) {
1413 Ok(s) => s,
1414 Err(e) => {
1415 info!("Failed to load task store for injection: {}", e);
1416 return prompt;
1417 }
1418 };
1419
1420 let current_loop_id = self.current_loop_id();
1421
1422 let ready = Self::filter_tasks_by_loop(store.ready(), current_loop_id.as_deref());
1423 let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1424 let all_count =
1425 Self::filter_tasks_by_loop(store.all().iter().collect(), current_loop_id.as_deref())
1426 .len();
1427 let closed_count = all_count - open.len();
1428
1429 if open.is_empty() && closed_count == 0 {
1430 return prompt;
1431 }
1432
1433 let mut section = String::from("<ready-tasks>\n");
1434 if ready.is_empty() && open.is_empty() {
1435 section.push_str("No open tasks. Create tasks with `ralph tools task add`.\n");
1436 } else {
1437 section.push_str(&format!(
1438 "## Tasks: {} ready, {} open, {} closed\n\n",
1439 ready.len(),
1440 open.len(),
1441 closed_count
1442 ));
1443 for task in &ready {
1444 let status_icon = match task.status {
1445 TaskStatus::Open => "[ ]",
1446 TaskStatus::InProgress => "[~]",
1447 _ => "[?]",
1448 };
1449 section.push_str(&format!(
1450 "- {} [P{}] {} ({}){}\n",
1451 status_icon,
1452 task.priority,
1453 task.title,
1454 task.id,
1455 task.key
1456 .as_deref()
1457 .map(|key| format!(" — key: {key}"))
1458 .unwrap_or_default()
1459 ));
1460 }
1461 let ready_ids: Vec<&str> = ready.iter().map(|t| t.id.as_str()).collect();
1463 let blocked: Vec<_> = open
1464 .iter()
1465 .filter(|t| !ready_ids.contains(&t.id.as_str()))
1466 .collect();
1467 if !blocked.is_empty() {
1468 section.push_str("\nBlocked:\n");
1469 for task in blocked {
1470 section.push_str(&format!(
1471 "- [blocked] [P{}] {} ({}){} — blocked by: {}\n",
1472 task.priority,
1473 task.title,
1474 task.id,
1475 task.key
1476 .as_deref()
1477 .map(|key| format!(" — key: {key}"))
1478 .unwrap_or_default(),
1479 task.blocked_by.join(", ")
1480 ));
1481 }
1482 }
1483 }
1484 section.push_str("</ready-tasks>\n\n");
1485
1486 info!(
1487 "Injecting ready tasks ({} ready, {} open, {} closed) into prompt",
1488 ready.len(),
1489 open.len(),
1490 closed_count
1491 );
1492
1493 let mut final_prompt = section;
1494 final_prompt.push_str(&prompt);
1495 final_prompt
1496 }
1497
1498 pub fn build_ralph_prompt(&self, prompt_content: &str) -> String {
1500 self.ralph.build_prompt(prompt_content, &[])
1501 }
1502
1503 fn determine_active_hats(&self, events: &[Event]) -> Vec<&Hat> {
1506 let mut active_hats = Vec::new();
1507 for id in self.determine_active_hat_ids(events) {
1508 if let Some(hat) = self.registry.get(&id) {
1509 active_hats.push(hat);
1510 }
1511 }
1512 active_hats
1513 }
1514
1515 fn determine_active_hat_ids(&self, events: &[Event]) -> Vec<HatId> {
1516 let mut entrypoint_hat_ids = Vec::new();
1517 let mut progressed_hat_ids = Vec::new();
1518 for event in events {
1519 let hat_id = if let Some(target) = &event.target
1521 && self.registry.get(target).is_some()
1522 {
1523 target.clone()
1524 } else if let Some(hat) = self.registry.get_for_topic(event.topic.as_str()) {
1525 hat.id.clone()
1526 } else {
1527 continue;
1528 };
1529
1530 let list = if self.is_entrypoint_topic(event.topic.as_str()) {
1531 &mut entrypoint_hat_ids
1532 } else {
1533 &mut progressed_hat_ids
1534 };
1535 if !list.iter().any(|id| id == &hat_id) {
1536 list.push(hat_id);
1537 }
1538 }
1539 if progressed_hat_ids.is_empty() {
1545 entrypoint_hat_ids
1546 } else {
1547 progressed_hat_ids
1548 }
1549 }
1550
1551 fn effective_regular_events<'a>(&self, events: &'a [Event]) -> Vec<&'a Event> {
1552 let has_downstream_event = events
1553 .iter()
1554 .any(|event| !Self::is_kickoff_or_recovery_event(event.topic.as_str()));
1555 events
1556 .iter()
1557 .filter(|event| {
1558 !has_downstream_event || !Self::is_kickoff_or_recovery_event(event.topic.as_str())
1559 })
1560 .collect()
1561 }
1562
1563 fn is_kickoff_or_recovery_event(topic: &str) -> bool {
1564 topic == "task.start" || topic == "task.resume" || topic.strip_suffix(".start").is_some()
1565 }
1566
1567 fn is_entrypoint_topic(&self, topic: &str) -> bool {
1568 topic == "task.start"
1569 || topic == "task.resume"
1570 || topic.strip_suffix(".start").is_some()
1571 || self.config.event_loop.starting_event.as_deref() == Some(topic)
1572 }
1573
1574 fn peek_pending_regular_events(&self) -> Vec<Event> {
1575 let mut events = Vec::new();
1576 for hat_id in self.bus.hat_ids() {
1577 if let Some(pending) = self.bus.peek_pending(hat_id) {
1578 events.extend(pending.iter().cloned());
1579 }
1580 }
1581 events
1582 }
1583
1584 fn format_event(event: &Event) -> String {
1589 let topic = &event.topic;
1590 let payload = &event.payload;
1591
1592 if topic.as_str() == "task.start" || topic.as_str() == "task.resume" {
1593 format!(
1594 "Event: {} - <top-level-prompt>\n{}\n</top-level-prompt>",
1595 topic, payload
1596 )
1597 } else {
1598 format!("Event: {} - {}", topic, payload)
1599 }
1600 }
1601
1602 fn check_hat_exhaustion(&mut self, hat_id: &HatId, dropped: &[Event]) -> (bool, Option<Event>) {
1603 let Some(config) = self.registry.get_config(hat_id) else {
1604 return (false, None);
1605 };
1606 let Some(max) = config.max_activations else {
1607 return (false, None);
1608 };
1609
1610 let count = *self.state.hat_activation_counts.get(hat_id).unwrap_or(&0);
1611 if count < max {
1612 return (false, None);
1613 }
1614
1615 let should_emit = self.state.exhausted_hats.insert(hat_id.clone());
1617
1618 if !should_emit {
1619 return (true, None);
1621 }
1622
1623 let mut dropped_topics: Vec<String> = dropped.iter().map(|e| e.topic.to_string()).collect();
1624 dropped_topics.sort();
1625
1626 let payload = format!(
1627 "Hat '{hat}' exhausted.\n- max_activations: {max}\n- activations: {count}\n- dropped_topics:\n - {topics}",
1628 hat = hat_id.as_str(),
1629 max = max,
1630 count = count,
1631 topics = dropped_topics.join("\n - ")
1632 );
1633
1634 warn!(
1635 hat = %hat_id.as_str(),
1636 max_activations = max,
1637 activations = count,
1638 "Hat exhausted (max_activations reached)"
1639 );
1640
1641 (
1642 true,
1643 Some(Event::new(
1644 format!("{}.exhausted", hat_id.as_str()),
1645 payload,
1646 )),
1647 )
1648 }
1649
1650 fn record_hat_activations(&mut self, active_hat_ids: &[HatId]) {
1651 for hat_id in active_hat_ids {
1652 *self
1653 .state
1654 .hat_activation_counts
1655 .entry(hat_id.clone())
1656 .or_insert(0) += 1;
1657 }
1658 }
1659
1660 pub fn get_active_hat_id(&self) -> HatId {
1664 let pending_events = self.peek_pending_regular_events();
1665 if let Some(active_hat_id) = self
1666 .determine_active_hat_ids(&pending_events)
1667 .into_iter()
1668 .next()
1669 {
1670 return active_hat_id;
1671 }
1672 HatId::new("ralph")
1673 }
1674
1675 pub fn check_default_publishes(&mut self, hat_id: &HatId) {
1685 if let Some(config) = self.registry.get_config(hat_id)
1686 && let Some(default_topic) = &config.default_publishes
1687 {
1688 let default_event = Event::new(default_topic.as_str(), "").with_source(hat_id.clone());
1689
1690 debug!(
1691 hat = %hat_id.as_str(),
1692 topic = %default_topic,
1693 "No events written by hat, injecting default_publishes event"
1694 );
1695
1696 self.state.record_event(&default_event);
1697
1698 if default_topic.as_str() == self.config.event_loop.completion_promise {
1702 info!(
1703 hat = %hat_id.as_str(),
1704 topic = %default_topic,
1705 "default_publishes matches completion_promise — requesting termination"
1706 );
1707 self.state.completion_requested = true;
1708 }
1709
1710 self.bus.publish(default_event);
1711 }
1712 }
1713
1714 pub fn bus(&mut self) -> &mut EventBus {
1719 &mut self.bus
1720 }
1721
1722 pub fn process_output(
1726 &mut self,
1727 hat_id: &HatId,
1728 output: &str,
1729 success: bool,
1730 ) -> Option<TerminationReason> {
1731 self.state.iteration += 1;
1732 self.state.last_hat = Some(hat_id.clone());
1733
1734 if let Some(interval_secs) = self.config.robot.checkin_interval_seconds
1736 && let Some(ref robot_service) = self.robot_service
1737 {
1738 let elapsed = self.state.elapsed();
1739 let interval = std::time::Duration::from_secs(interval_secs);
1740 let last = self
1741 .state
1742 .last_checkin_at
1743 .map(|t| t.elapsed())
1744 .unwrap_or(elapsed);
1745
1746 if last >= interval {
1747 let context = self.build_checkin_context(hat_id);
1748 match robot_service.send_checkin(self.state.iteration, elapsed, Some(&context)) {
1749 Ok(_) => {
1750 self.state.last_checkin_at = Some(std::time::Instant::now());
1751 debug!(iteration = self.state.iteration, "Sent robot check-in");
1752 }
1753 Err(e) => {
1754 warn!(error = %e, "Failed to send robot check-in");
1755 }
1756 }
1757 }
1758 }
1759
1760 self.diagnostics.log_orchestration(
1762 self.state.iteration,
1763 "loop",
1764 crate::diagnostics::OrchestrationEvent::IterationStarted,
1765 );
1766
1767 self.diagnostics.log_orchestration(
1769 self.state.iteration,
1770 "loop",
1771 crate::diagnostics::OrchestrationEvent::HatSelected {
1772 hat: hat_id.to_string(),
1773 reason: "process_output".to_string(),
1774 },
1775 );
1776
1777 if success {
1779 self.state.consecutive_failures = 0;
1780 } else {
1781 self.state.consecutive_failures += 1;
1782 }
1783
1784 let _ = output;
1785
1786 self.audit_file_modifications(hat_id);
1789
1790 self.check_termination()
1796 }
1797
1798 fn audit_file_modifications(&mut self, hat_id: &HatId) {
1804 let config = match self.registry.get_config(hat_id) {
1805 Some(c) => c,
1806 None => return,
1807 };
1808
1809 let has_write_restriction = config
1810 .disallowed_tools
1811 .iter()
1812 .any(|t| t == "Edit" || t == "Write");
1813
1814 if !has_write_restriction {
1815 return;
1816 }
1817
1818 let workspace = &self.config.core.workspace_root;
1819 let diff_output = std::process::Command::new("git")
1820 .args(["diff", "--stat", "HEAD"])
1821 .current_dir(workspace)
1822 .output();
1823
1824 match diff_output {
1825 Ok(output) if !output.stdout.is_empty() => {
1826 let diff_stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
1827 warn!(
1828 hat = %hat_id.as_str(),
1829 diff = %diff_stat,
1830 "Hat modified files despite tool restrictions (scope violation)"
1831 );
1832
1833 let violation_topic = format!("{}.scope_violation", hat_id.as_str());
1834 let violation = Event::new(
1835 violation_topic.as_str(),
1836 format!(
1837 "Hat '{}' modified files with Edit/Write disallowed:\n{}",
1838 hat_id.as_str(),
1839 diff_stat
1840 ),
1841 );
1842 self.bus.publish(violation);
1843 }
1844 Err(e) => {
1845 debug!(error = %e, "Could not run git diff for file-modification audit");
1846 }
1847 _ => {} }
1849 }
1850
1851 fn extract_task_id(payload: &str) -> String {
1854 payload
1855 .lines()
1856 .next()
1857 .unwrap_or("unknown")
1858 .trim()
1859 .to_string()
1860 }
1861
1862 pub fn add_cost(&mut self, cost: f64) {
1864 self.state.cumulative_cost += cost;
1865 }
1866
1867 fn verify_scratchpad_complete(&self) -> Result<bool, std::io::Error> {
1874 let scratchpad_path = self.scratchpad_path();
1875
1876 if !scratchpad_path.exists() {
1877 return Err(std::io::Error::new(
1878 std::io::ErrorKind::NotFound,
1879 "Scratchpad does not exist",
1880 ));
1881 }
1882
1883 let content = std::fs::read_to_string(scratchpad_path)?;
1884
1885 let has_pending = content
1886 .lines()
1887 .any(|line| line.trim_start().starts_with("- [ ]"));
1888
1889 Ok(!has_pending)
1890 }
1891
1892 fn current_loop_id(&self) -> Option<String> {
1897 self.loop_context
1898 .as_ref()
1899 .and_then(|ctx| {
1900 let marker_path = ctx.ralph_dir().join("current-loop-id");
1901 std::fs::read_to_string(&marker_path).ok()
1902 })
1903 .map(|id| id.trim().to_string())
1904 .filter(|id| !id.is_empty())
1905 }
1906
1907 fn filter_tasks_by_loop<'a>(
1909 tasks: Vec<&'a crate::task::Task>,
1910 loop_id: Option<&str>,
1911 ) -> Vec<&'a crate::task::Task> {
1912 match loop_id {
1913 Some(id) => tasks
1914 .into_iter()
1915 .filter(|t| t.loop_id.as_deref() == Some(id))
1916 .collect(),
1917 None => tasks,
1918 }
1919 }
1920
1921 fn verify_tasks_complete(&self) -> Result<bool, std::io::Error> {
1922 use crate::task_store::TaskStore;
1923
1924 let tasks_path = self.tasks_path();
1925
1926 if !tasks_path.exists() {
1928 return Ok(true);
1929 }
1930
1931 let store = TaskStore::load(&tasks_path)?;
1932 let current_loop_id = self.current_loop_id();
1933 let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1934 Ok(open.is_empty())
1935 }
1936
1937 fn build_checkin_context(&self, hat_id: &HatId) -> CheckinContext {
1939 let (open_tasks, closed_tasks) = self.count_tasks();
1940 CheckinContext {
1941 current_hat: Some(hat_id.as_str().to_string()),
1942 open_tasks,
1943 closed_tasks,
1944 cumulative_cost: self.state.cumulative_cost,
1945 }
1946 }
1947
1948 fn count_tasks(&self) -> (usize, usize) {
1953 use crate::task_store::TaskStore;
1954
1955 let tasks_path = self.tasks_path();
1956 if !tasks_path.exists() {
1957 return (0, 0);
1958 }
1959
1960 match TaskStore::load(&tasks_path) {
1961 Ok(store) => {
1962 let current_loop_id = self.current_loop_id();
1963 let all = Self::filter_tasks_by_loop(
1964 store.all().iter().collect(),
1965 current_loop_id.as_deref(),
1966 );
1967 let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1968 let closed = all.len() - open.len();
1969 (open.len(), closed)
1970 }
1971 Err(_) => (0, 0),
1972 }
1973 }
1974
1975 fn get_open_task_list(&self) -> Vec<String> {
1977 use crate::task_store::TaskStore;
1978
1979 let tasks_path = self.tasks_path();
1980 if let Ok(store) = TaskStore::load(&tasks_path) {
1981 let current_loop_id = self.current_loop_id();
1982 let open = Self::filter_tasks_by_loop(store.open(), current_loop_id.as_deref());
1983 return open
1984 .iter()
1985 .map(|t| format!("{}: {}", t.id, t.title))
1986 .collect();
1987 }
1988 vec![]
1989 }
1990
1991 fn warn_on_mutation_evidence(&self, evidence: &crate::event_parser::BackpressureEvidence) {
1992 let threshold = self.config.event_loop.mutation_score_warn_threshold;
1993
1994 match &evidence.mutants {
1995 Some(mutants) => {
1996 if let Some(reason) = Self::mutation_warning_reason(mutants, threshold) {
1997 warn!(
1998 reason = %reason,
1999 mutants_status = ?mutants.status,
2000 mutants_score = mutants.score_percent,
2001 mutants_threshold = threshold,
2002 "Mutation testing warning"
2003 );
2004 }
2005 }
2006 None => {
2007 if let Some(threshold) = threshold {
2008 warn!(
2009 mutants_threshold = threshold,
2010 "Mutation testing warning: missing mutation evidence in build.done payload"
2011 );
2012 }
2013 }
2014 }
2015 }
2016
2017 fn mutation_warning_reason(
2018 mutants: &MutationEvidence,
2019 threshold: Option<f64>,
2020 ) -> Option<String> {
2021 match mutants.status {
2022 MutationStatus::Fail => Some("mutation testing failed".to_string()),
2023 MutationStatus::Warn => Some(Self::format_mutation_message(
2024 "mutation score below threshold",
2025 mutants.score_percent,
2026 )),
2027 MutationStatus::Unknown => Some("mutation testing status unknown".to_string()),
2028 MutationStatus::Pass => {
2029 let threshold = threshold?;
2030
2031 match mutants.score_percent {
2032 Some(score) if score < threshold => Some(format!(
2033 "mutation score {:.2}% below threshold {:.2}%",
2034 score, threshold
2035 )),
2036 Some(_) => None,
2037 None => Some(format!(
2038 "mutation score missing (threshold {:.2}%)",
2039 threshold
2040 )),
2041 }
2042 }
2043 }
2044 }
2045
2046 fn format_mutation_message(message: &str, score: Option<f64>) -> String {
2047 match score {
2048 Some(score) => format!("{message} ({score:.2}%)"),
2049 None => message.to_string(),
2050 }
2051 }
2052
2053 fn parse_human_interact_context(payload: &str) -> Value {
2054 let mut context = match serde_json::from_str::<Value>(payload) {
2055 Ok(Value::Object(map)) => map,
2056 Ok(value) => {
2057 let mut map = Map::new();
2058 map.insert("question".to_string(), value);
2059 map
2060 }
2061 Err(_) => {
2062 let mut map = Map::new();
2063 map.insert("question".to_string(), Value::String(payload.to_string()));
2064 map
2065 }
2066 };
2067
2068 if !context.contains_key("question") {
2069 context.insert("question".to_string(), Value::String(payload.to_string()));
2070 }
2071
2072 Value::Object(context)
2073 }
2074
2075 fn is_restart_request_payload(payload: &str) -> bool {
2076 let payload = payload.to_ascii_lowercase();
2077 payload.contains("restart yourself") || payload.contains("restart ralph")
2078 }
2079
2080 fn is_restart_request_event(event: &Event) -> bool {
2081 matches!(event.topic.as_str(), "human.response" | "user.prompt")
2082 && Self::is_restart_request_payload(&event.payload)
2083 }
2084
2085 fn mark_restart_requested(&self, source: &str) {
2086 let restart_path =
2087 std::path::Path::new(&self.config.core.workspace_root).join(".ralph/restart-requested");
2088
2089 if let Some(parent) = restart_path.parent()
2090 && let Err(err) = std::fs::create_dir_all(parent)
2091 {
2092 warn!(
2093 error = %err,
2094 path = %parent.display(),
2095 "Failed to create restart-requested parent directory"
2096 );
2097 return;
2098 }
2099
2100 if let Err(err) = std::fs::write(&restart_path, source) {
2101 warn!(
2102 error = %err,
2103 path = %restart_path.display(),
2104 "Failed to write restart-requested signal"
2105 );
2106 return;
2107 }
2108
2109 info!(
2110 source,
2111 path = %restart_path.display(),
2112 "Restart requested from human text"
2113 );
2114 }
2115
2116 pub fn process_events_from_jsonl(&mut self) -> std::io::Result<ProcessedEvents> {
2128 let result = self.event_reader.read_new_events()?;
2129 self.process_parse_result(result)
2130 }
2131
2132 fn process_parse_result(
2138 &mut self,
2139 result: crate::event_reader::ParseResult,
2140 ) -> std::io::Result<ProcessedEvents> {
2141 for malformed in &result.malformed {
2143 let payload = format!(
2144 "Line {}: {}\nContent: {}",
2145 malformed.line_number, malformed.error, &malformed.content
2146 );
2147 let event = Event::new("event.malformed", &payload);
2148 self.bus.publish(event);
2149 self.state.consecutive_malformed_events += 1;
2150 warn!(
2151 line = malformed.line_number,
2152 consecutive = self.state.consecutive_malformed_events,
2153 "Malformed event line detected"
2154 );
2155 }
2156
2157 if !result.events.is_empty() {
2159 self.state.consecutive_malformed_events = 0;
2160 }
2161
2162 if result.events.is_empty() && result.malformed.is_empty() {
2163 return Ok(ProcessedEvents {
2164 had_events: false,
2165 had_plan_events: false,
2166 human_interact_context: None,
2167 has_orphans: false,
2168 });
2169 }
2170
2171 let events = if self.config.event_loop.enforce_hat_scope {
2174 let active_hats = self.state.last_active_hat_ids.clone();
2175 let (in_scope, out_of_scope): (Vec<_>, Vec<_>) =
2176 result.events.into_iter().partition(|event| {
2177 if active_hats.is_empty() {
2178 return true; }
2180 active_hats
2181 .iter()
2182 .any(|hat_id| self.registry.can_publish(hat_id, event.topic.as_str()))
2183 });
2184
2185 for event in &out_of_scope {
2186 let violation_hat = active_hats.first().map(|h| h.as_str()).unwrap_or("unknown");
2187 warn!(
2188 active_hats = ?active_hats,
2189 topic = %event.topic,
2190 "Scope violation: active hat(s) cannot publish this topic — dropping event"
2191 );
2192 let violation_topic = format!("{}.scope_violation", violation_hat);
2193 let violation_payload = format!(
2194 "Attempted to publish '{}': {}",
2195 event.topic,
2196 event.payload.clone().unwrap_or_default()
2197 );
2198 let violation = Event::new(violation_topic, violation_payload);
2199 self.bus.publish(violation);
2200 }
2201
2202 in_scope
2203 } else {
2204 result.events
2205 };
2206 let mut has_orphans = false;
2209
2210 let mut validated_events = Vec::new();
2212 let completion_topic = self.config.event_loop.completion_promise.as_str();
2213 let cancellation_topic = self.config.event_loop.cancellation_promise.clone();
2214 let total_events = events.len();
2215 for (index, event) in events.into_iter().enumerate() {
2216 let payload = event.payload.clone().unwrap_or_default();
2217
2218 if !cancellation_topic.is_empty() && event.topic.as_str() == cancellation_topic {
2220 info!(
2221 payload = %payload,
2222 "loop.cancel event detected — scheduling graceful termination"
2223 );
2224 self.state.cancellation_requested = true;
2225 continue;
2227 }
2228
2229 if event.topic == completion_topic {
2230 if index + 1 == total_events {
2231 self.state.completion_requested = true;
2232 self.diagnostics.log_orchestration(
2233 self.state.iteration,
2234 "jsonl",
2235 crate::diagnostics::OrchestrationEvent::EventPublished {
2236 topic: event.topic.clone(),
2237 },
2238 );
2239 info!(
2240 topic = %event.topic,
2241 "Completion event detected in JSONL"
2242 );
2243 } else {
2244 warn!(
2245 topic = %event.topic,
2246 index = index,
2247 total_events = total_events,
2248 "Completion event ignored because it was not the last event"
2249 );
2250 }
2251 continue;
2252 }
2253
2254 if event.topic == "build.done" {
2255 if let Some(evidence) = EventParser::parse_backpressure_evidence(&payload) {
2257 if evidence.all_passed() {
2258 self.warn_on_mutation_evidence(&evidence);
2259 validated_events.push(Event::new(event.topic.as_str(), &payload));
2260 } else {
2261 warn!(
2263 tests = evidence.tests_passed,
2264 lint = evidence.lint_passed,
2265 typecheck = evidence.typecheck_passed,
2266 audit = evidence.audit_passed,
2267 coverage = evidence.coverage_passed,
2268 complexity = evidence.complexity_score,
2269 duplication = evidence.duplication_passed,
2270 performance = evidence.performance_regression,
2271 specs = evidence.specs_verified,
2272 "build.done rejected: backpressure checks failed"
2273 );
2274
2275 let complexity = evidence
2276 .complexity_score
2277 .map(|value| format!("{value:.2}"))
2278 .unwrap_or_else(|| "missing".to_string());
2279 let performance = match evidence.performance_regression {
2280 Some(true) => "regression".to_string(),
2281 Some(false) => "pass".to_string(),
2282 None => "missing".to_string(),
2283 };
2284 let specs = match evidence.specs_verified {
2285 Some(true) => "pass".to_string(),
2286 Some(false) => "fail".to_string(),
2287 None => "not reported".to_string(),
2288 };
2289
2290 self.diagnostics.log_orchestration(
2291 self.state.iteration,
2292 "jsonl",
2293 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2294 reason: format!(
2295 "backpressure checks failed: tests={}, lint={}, typecheck={}, audit={}, coverage={}, complexity={}, duplication={}, performance={}, specs={}",
2296 evidence.tests_passed,
2297 evidence.lint_passed,
2298 evidence.typecheck_passed,
2299 evidence.audit_passed,
2300 evidence.coverage_passed,
2301 complexity,
2302 evidence.duplication_passed,
2303 performance,
2304 specs
2305 ),
2306 },
2307 );
2308
2309 validated_events.push(Event::new(
2310 "build.blocked",
2311 "Backpressure checks failed. Fix tests/lint/typecheck/audit/coverage/complexity/duplication/specs before emitting build.done.",
2312 ));
2313 }
2314 } else {
2315 warn!("build.done rejected: missing backpressure evidence");
2317
2318 self.diagnostics.log_orchestration(
2319 self.state.iteration,
2320 "jsonl",
2321 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2322 reason: "missing backpressure evidence".to_string(),
2323 },
2324 );
2325
2326 validated_events.push(Event::new(
2327 "build.blocked",
2328 "Missing backpressure evidence. Include 'tests: pass', 'lint: pass', 'typecheck: pass', 'audit: pass', 'coverage: pass', 'complexity: <score>', 'duplication: pass', 'performance: pass' (optional), 'specs: pass' (optional) in build.done payload.",
2329 ));
2330 }
2331 } else if event.topic == "review.done" && !event.is_wave_event() {
2332 if let Some(evidence) = EventParser::parse_review_evidence(&payload) {
2336 if evidence.is_verified() {
2337 validated_events.push(Event::new(event.topic.as_str(), &payload));
2338 } else {
2339 warn!(
2341 tests = evidence.tests_passed,
2342 build = evidence.build_passed,
2343 "review.done rejected: verification checks failed"
2344 );
2345
2346 self.diagnostics.log_orchestration(
2347 self.state.iteration,
2348 "jsonl",
2349 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2350 reason: format!(
2351 "review verification failed: tests={}, build={}",
2352 evidence.tests_passed, evidence.build_passed
2353 ),
2354 },
2355 );
2356
2357 validated_events.push(Event::new(
2358 "review.blocked",
2359 "Review verification failed. Run tests and build before emitting review.done.",
2360 ));
2361 }
2362 } else {
2363 warn!("review.done rejected: missing verification evidence");
2365
2366 self.diagnostics.log_orchestration(
2367 self.state.iteration,
2368 "jsonl",
2369 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2370 reason: "missing review verification evidence".to_string(),
2371 },
2372 );
2373
2374 validated_events.push(Event::new(
2375 "review.blocked",
2376 "Missing verification evidence. Include 'tests: pass' and 'build: pass' in review.done payload.",
2377 ));
2378 }
2379 } else if event.topic == "verify.passed" {
2380 if let Some(report) = EventParser::parse_quality_report(&payload) {
2381 if report.meets_thresholds() {
2382 validated_events.push(Event::new(event.topic.as_str(), &payload));
2383 } else {
2384 let failed = report.failed_dimensions();
2385 let reason = if failed.is_empty() {
2386 "quality thresholds failed".to_string()
2387 } else {
2388 format!("quality thresholds failed: {}", failed.join(", "))
2389 };
2390
2391 warn!(
2392 failed_dimensions = ?failed,
2393 "verify.passed rejected: quality thresholds failed"
2394 );
2395
2396 self.diagnostics.log_orchestration(
2397 self.state.iteration,
2398 "jsonl",
2399 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2400 reason,
2401 },
2402 );
2403
2404 validated_events.push(Event::new(
2405 "verify.failed",
2406 "Quality thresholds failed. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity with thresholds in verify.passed payload.",
2407 ));
2408 }
2409 } else {
2410 warn!("verify.passed rejected: missing quality report");
2412
2413 self.diagnostics.log_orchestration(
2414 self.state.iteration,
2415 "jsonl",
2416 crate::diagnostics::OrchestrationEvent::BackpressureTriggered {
2417 reason: "missing quality report".to_string(),
2418 },
2419 );
2420
2421 validated_events.push(Event::new(
2422 "verify.failed",
2423 "Missing quality report. Include quality.tests, quality.coverage, quality.lint, quality.audit, quality.mutation, quality.complexity in verify.passed payload.",
2424 ));
2425 }
2426 } else if event.topic == "verify.failed" {
2427 if EventParser::parse_quality_report(&payload).is_none() {
2428 warn!("verify.failed missing quality report");
2429 }
2430 validated_events.push(Event::new(event.topic.as_str(), &payload));
2431 } else {
2432 validated_events.push(Event::new(event.topic.as_str(), &payload));
2434 }
2435 }
2436
2437 let blocked_events: Vec<_> = validated_events
2439 .iter()
2440 .filter(|e| e.topic == "build.blocked".into())
2441 .collect();
2442
2443 for blocked_event in &blocked_events {
2444 let task_id = Self::extract_task_id(&blocked_event.payload);
2445
2446 let count = self
2447 .state
2448 .task_block_counts
2449 .entry(task_id.clone())
2450 .or_insert(0);
2451 *count += 1;
2452
2453 debug!(
2454 task_id = %task_id,
2455 block_count = *count,
2456 "Task blocked"
2457 );
2458
2459 if *count >= 3 && !self.state.abandoned_tasks.contains(&task_id) {
2461 warn!(
2462 task_id = %task_id,
2463 "Task abandoned after 3 consecutive blocks"
2464 );
2465
2466 self.state.abandoned_tasks.push(task_id.clone());
2467
2468 self.diagnostics.log_orchestration(
2469 self.state.iteration,
2470 "jsonl",
2471 crate::diagnostics::OrchestrationEvent::TaskAbandoned {
2472 reason: format!(
2473 "3 consecutive build.blocked events for task '{}'",
2474 task_id
2475 ),
2476 },
2477 );
2478
2479 let abandoned_event = Event::new(
2480 "build.task.abandoned",
2481 format!(
2482 "Task '{}' abandoned after 3 consecutive build.blocked events",
2483 task_id
2484 ),
2485 );
2486
2487 self.bus.publish(abandoned_event);
2488 }
2489 }
2490
2491 let has_blocked_event = !blocked_events.is_empty();
2493
2494 if has_blocked_event {
2495 self.state.consecutive_blocked += 1;
2496 } else {
2497 self.state.consecutive_blocked = 0;
2498 self.state.last_blocked_hat = None;
2499 }
2500
2501 let mut response_event = None;
2505 let mut human_interact_context = None;
2506 let ask_human_idx = validated_events
2507 .iter()
2508 .position(|e| e.topic == "human.interact".into());
2509
2510 if let Some(idx) = ask_human_idx {
2511 let ask_event = &validated_events[idx];
2512 let payload = ask_event.payload.clone();
2513
2514 let mut context = match Self::parse_human_interact_context(&payload) {
2515 Value::Object(map) => map,
2516 _ => Map::new(),
2517 };
2518
2519 if let Some(ref robot_service) = self.robot_service {
2520 info!(
2521 payload = %payload,
2522 "human.interact event detected — sending question via robot service"
2523 );
2524
2525 let send_ok = match robot_service.send_question(&payload) {
2527 Ok(_message_id) => true,
2528 Err(e) => {
2529 warn!(
2530 error = %e,
2531 "Failed to send human.interact question after retries — treating as timeout"
2532 );
2533 self.diagnostics.log_error(
2535 self.state.iteration,
2536 "telegram",
2537 crate::diagnostics::DiagnosticError::TelegramSendError {
2538 operation: "send_question".to_string(),
2539 error: e.to_string(),
2540 retry_count: 3,
2541 },
2542 );
2543 context.insert(
2544 "outcome".to_string(),
2545 Value::String("send_failure".to_string()),
2546 );
2547 context.insert("error".to_string(), Value::String(e.to_string()));
2548 false
2549 }
2550 };
2551
2552 if send_ok {
2555 let events_path = self
2558 .loop_context
2559 .as_ref()
2560 .and_then(|ctx| {
2561 std::fs::read_to_string(ctx.current_events_marker())
2562 .ok()
2563 .map(|s| ctx.workspace().join(s.trim()))
2564 })
2565 .or_else(|| {
2566 std::fs::read_to_string(".ralph/current-events")
2567 .ok()
2568 .map(|s| PathBuf::from(s.trim()))
2569 })
2570 .unwrap_or_else(|| {
2571 self.loop_context
2572 .as_ref()
2573 .map(|ctx| ctx.events_path())
2574 .unwrap_or_else(|| PathBuf::from(".ralph/events.jsonl"))
2575 });
2576
2577 match robot_service.wait_for_response(&events_path) {
2578 Ok(Some(response)) => {
2579 info!(
2580 response = %response,
2581 "Received human.response — continuing loop"
2582 );
2583 context.insert(
2584 "outcome".to_string(),
2585 Value::String("response".to_string()),
2586 );
2587 context.insert("response".to_string(), Value::String(response.clone()));
2588 response_event = Some(Event::new("human.response", &response));
2590 }
2591 Ok(None) => {
2592 warn!(
2593 timeout_secs = robot_service.timeout_secs(),
2594 "Human response timeout — injecting human.timeout event"
2595 );
2596 context.insert(
2597 "outcome".to_string(),
2598 Value::String("timeout".to_string()),
2599 );
2600 context.insert(
2601 "timeout_seconds".to_string(),
2602 Value::from(robot_service.timeout_secs()),
2603 );
2604 let timeout_event = Event::new(
2605 "human.timeout",
2606 format!(
2607 "No response after {}s. Original question: {}",
2608 robot_service.timeout_secs(),
2609 payload
2610 ),
2611 );
2612 response_event = Some(timeout_event);
2613 }
2614 Err(e) => {
2615 warn!(
2616 error = %e,
2617 "Error waiting for human response — injecting human.timeout event"
2618 );
2619 context.insert(
2620 "outcome".to_string(),
2621 Value::String("wait_error".to_string()),
2622 );
2623 context.insert("error".to_string(), Value::String(e.to_string()));
2624 let timeout_event = Event::new(
2625 "human.timeout",
2626 format!(
2627 "Error waiting for response: {}. Original question: {}",
2628 e, payload
2629 ),
2630 );
2631 response_event = Some(timeout_event);
2632 }
2633 }
2634 }
2635 } else {
2636 debug!(
2637 "human.interact event detected but no robot service active — passing through"
2638 );
2639 context.insert(
2640 "outcome".to_string(),
2641 Value::String("no_robot_service".to_string()),
2642 );
2643 }
2644
2645 human_interact_context = Some(Value::Object(context));
2646 }
2647
2648 let restart_requested = validated_events.iter().any(Self::is_restart_request_event)
2649 || response_event
2650 .as_ref()
2651 .is_some_and(Self::is_restart_request_event);
2652 if restart_requested {
2653 self.mark_restart_requested("human_text");
2654 }
2655
2656 let had_events = !validated_events.is_empty();
2658 let had_plan_events = validated_events
2659 .iter()
2660 .any(|event| event.topic.as_str().starts_with("plan."));
2661
2662 for event in validated_events {
2667 self.state.record_event(&event);
2669
2670 self.diagnostics.log_orchestration(
2671 self.state.iteration,
2672 "jsonl",
2673 crate::diagnostics::OrchestrationEvent::EventPublished {
2674 topic: event.topic.to_string(),
2675 },
2676 );
2677
2678 if !self.registry.has_subscriber(event.topic.as_str()) {
2679 has_orphans = true;
2680 }
2681
2682 debug!(
2683 topic = %event.topic,
2684 "Publishing event from JSONL"
2685 );
2686 self.bus.publish(event);
2687 }
2688
2689 if let Some(response) = response_event {
2691 self.state.record_event(&response);
2692 info!(
2693 topic = %response.topic,
2694 "Publishing human.response event from robot service"
2695 );
2696 self.bus.publish(response);
2697 }
2698
2699 Ok(ProcessedEvents {
2700 had_events,
2701 had_plan_events,
2702 human_interact_context,
2703 has_orphans,
2704 })
2705 }
2706
2707 pub fn process_events_from_jsonl_with_waves(
2713 &mut self,
2714 ) -> std::io::Result<ProcessedEventsWithWaves> {
2715 let result = self.event_reader.read_new_events()?;
2716
2717 let (wave_events, regular_events): (Vec<_>, Vec<_>) =
2725 result.events.into_iter().partition(|e| {
2726 e.wave_id.is_some()
2727 && self
2728 .registry
2729 .find_by_trigger(e.topic.as_str())
2730 .and_then(|hat_id| self.registry.get_config(hat_id))
2731 .is_some_and(|hat_config| hat_config.concurrency > 1)
2732 });
2733
2734 if !wave_events.is_empty() {
2735 debug!(
2736 wave_count = wave_events.len(),
2737 regular_count = regular_events.len(),
2738 "Partitioned wave events from regular events"
2739 );
2740 }
2741
2742 let regular_result = crate::event_reader::ParseResult {
2745 events: regular_events,
2746 malformed: result.malformed,
2747 };
2748 let processed = self.process_parse_result(regular_result)?;
2749
2750 Ok(ProcessedEventsWithWaves {
2751 processed,
2752 wave_events,
2753 })
2754 }
2755
2756 pub fn check_ralph_completion(&self, output: &str) -> bool {
2760 let events = EventParser::new().parse(output);
2761 events
2762 .iter()
2763 .any(|event| event.topic.as_str() == self.config.event_loop.completion_promise)
2764 }
2765
2766 pub fn publish_terminate_event(&mut self, reason: &TerminationReason) -> Event {
2773 self.stop_robot_service();
2775
2776 let elapsed = self.state.elapsed();
2777 let duration_str = format_duration(elapsed);
2778
2779 let payload = format!(
2780 "## Reason\n{}\n\n## Status\n{}\n\n## Summary\n- Iterations: {}\n- Duration: {}\n- Exit code: {}",
2781 reason.as_str(),
2782 termination_status_text(reason),
2783 self.state.iteration,
2784 duration_str,
2785 reason.exit_code()
2786 );
2787
2788 let event = Event::new("loop.terminate", &payload);
2789
2790 self.bus.publish(event.clone());
2792
2793 info!(
2794 reason = %reason.as_str(),
2795 iterations = self.state.iteration,
2796 duration = %duration_str,
2797 "Wrapping up: {}. {} iterations in {}.",
2798 reason.as_str(),
2799 self.state.iteration,
2800 duration_str
2801 );
2802
2803 event
2804 }
2805
2806 pub fn robot_shutdown_flag(&self) -> Option<Arc<AtomicBool>> {
2811 self.robot_service.as_ref().map(|s| s.shutdown_flag())
2812 }
2813
2814 fn stop_robot_service(&mut self) {
2818 if let Some(service) = self.robot_service.take() {
2819 service.stop();
2820 }
2821 }
2822
2823 pub fn check_for_user_prompt(&self, events: &[Event]) -> Option<UserPrompt> {
2831 events
2832 .iter()
2833 .find(|e| e.topic.as_str() == "user.prompt")
2834 .map(|e| UserPrompt {
2835 id: Self::extract_prompt_id(&e.payload),
2836 text: e.payload.clone(),
2837 })
2838 }
2839
2840 fn extract_prompt_id(payload: &str) -> String {
2845 if let Some(start) = payload.find("id=\"")
2847 && let Some(end) = payload[start + 4..].find('"')
2848 {
2849 return payload[start + 4..start + 4 + end].to_string();
2850 }
2851
2852 format!("q{}", Self::generate_prompt_id())
2854 }
2855
2856 fn generate_prompt_id() -> String {
2859 use std::time::{SystemTime, UNIX_EPOCH};
2860 let nanos = SystemTime::now()
2861 .duration_since(UNIX_EPOCH)
2862 .unwrap()
2863 .as_nanos();
2864 format!("{:x}", nanos % 0xFFFF_FFFF)
2865 }
2866}
2867
2868#[derive(Debug, Clone)]
2872pub struct UserPrompt {
2873 pub id: String,
2875 pub text: String,
2877}
2878
2879fn format_duration(d: Duration) -> String {
2881 let total_secs = d.as_secs();
2882 let hours = total_secs / 3600;
2883 let minutes = (total_secs % 3600) / 60;
2884 let seconds = total_secs % 60;
2885
2886 if hours > 0 {
2887 format!("{}h {}m {}s", hours, minutes, seconds)
2888 } else if minutes > 0 {
2889 format!("{}m {}s", minutes, seconds)
2890 } else {
2891 format!("{}s", seconds)
2892 }
2893}
2894
2895fn termination_status_text(reason: &TerminationReason) -> &'static str {
2897 match reason {
2898 TerminationReason::CompletionPromise => "All tasks completed successfully.",
2899 TerminationReason::MaxIterations => "Stopped at iteration limit.",
2900 TerminationReason::MaxRuntime => "Stopped at runtime limit.",
2901 TerminationReason::MaxCost => "Stopped at cost limit.",
2902 TerminationReason::ConsecutiveFailures => "Too many consecutive failures.",
2903 TerminationReason::LoopThrashing => {
2904 "Loop thrashing detected - same hat repeatedly blocked."
2905 }
2906 TerminationReason::LoopStale => {
2907 "Stale loop detected - same topic emitted 3+ times consecutively."
2908 }
2909 TerminationReason::ValidationFailure => "Too many consecutive malformed JSONL events.",
2910 TerminationReason::Stopped => "Manually stopped.",
2911 TerminationReason::Interrupted => "Interrupted by signal.",
2912 TerminationReason::RestartRequested => "Restarting by human request.",
2913 TerminationReason::WorkspaceGone => "Workspace directory removed externally.",
2914 TerminationReason::Cancelled => "Cancelled gracefully (human rejection or timeout).",
2915 }
2916}