1use super::state_store::{RalphRunState, RalphStateStore, StoryResultEntry};
4use super::types::*;
5use crate::bus::AgentBus;
6use crate::bus::relay::{ProtocolRelayRuntime, RelayAgentProfile};
7use crate::provenance::{
8 ExecutionOrigin, ExecutionProvenance, git_commit_with_provenance_blocking,
9};
10use crate::provider::{ContentPart, Message, Provider, ProviderRegistry, Role};
11use crate::session::{Session, SessionEvent};
12use crate::swarm::{executor::AgentLoopExit, run_agent_loop};
13use crate::tool::ToolRegistry;
14use crate::tui::ralph_view::{RalphEvent, RalphStoryInfo, RalphStoryStatus};
15use crate::tui::swarm_view::SwarmEvent;
16use crate::worktree::WorktreeManager;
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::process::Command;
20use std::sync::Arc;
21use tokio::sync::mpsc;
22use tracing::{debug, info, warn};
23
24pub struct RalphLoop {
26 state: RalphState,
27 provider: Arc<dyn Provider>,
28 model: String,
29 config: RalphConfig,
30 event_tx: Option<mpsc::Sender<RalphEvent>>,
31 bus: Option<Arc<AgentBus>>,
32 registry: Option<Arc<ProviderRegistry>>,
33 store: Option<Arc<dyn RalphStateStore>>,
34 run_id: String,
35}
36
37impl RalphLoop {
38 pub async fn new(
40 prd_path: PathBuf,
41 provider: Arc<dyn Provider>,
42 model: String,
43 config: RalphConfig,
44 ) -> anyhow::Result<Self> {
45 let prd = Prd::load(&prd_path).await?;
46
47 let working_dir = if let Some(parent) = prd_path.parent() {
49 if parent.as_os_str().is_empty() {
50 std::env::current_dir()?
51 } else {
52 parent.to_path_buf()
53 }
54 } else {
55 std::env::current_dir()?
56 };
57
58 info!(
59 "Loaded PRD: {} - {} ({} stories, {} already passed)",
60 prd.project,
61 prd.feature,
62 prd.user_stories.len(),
63 prd.passed_count()
64 );
65
66 let resumed_iterations = prd.passed_count();
68
69 let state = RalphState {
70 prd,
71 current_iteration: resumed_iterations,
72 max_iterations: config.max_iterations,
73 status: RalphStatus::Pending,
74 progress_log: Vec::new(),
75 prd_path: prd_path.clone(),
76 working_dir,
77 };
78
79 Ok(Self {
80 state,
81 provider,
82 model,
83 config,
84 event_tx: None,
85 bus: None,
86 registry: None,
87 store: None,
88 run_id: uuid::Uuid::new_v4().to_string(),
89 })
90 }
91
92 pub fn with_event_tx(mut self, tx: mpsc::Sender<RalphEvent>) -> Self {
94 self.event_tx = Some(tx);
95 self
96 }
97
98 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
100 self.bus = Some(bus);
101 self
102 }
103
104 pub fn with_registry(mut self, registry: Arc<ProviderRegistry>) -> Self {
106 self.registry = Some(registry);
107 self
108 }
109
110 pub fn with_store(mut self, store: Arc<dyn RalphStateStore>) -> Self {
112 self.store = Some(store);
113 self
114 }
115
116 pub fn with_run_id(mut self, run_id: String) -> Self {
118 self.run_id = run_id;
119 self
120 }
121
122 fn store_fire_and_forget(
124 &self,
125 fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
126 ) {
127 tokio::spawn(async move {
128 if let Err(e) = fut.await {
129 warn!(error = %e, "State store update failed");
130 }
131 });
132 }
133
134 fn try_send_event(&self, event: RalphEvent) {
136 if let Some(ref tx) = self.event_tx {
137 let _ = tx.try_send(event);
138 }
139 }
140
141 fn bus_publish_story_result(
143 &self,
144 story: &UserStory,
145 iteration: usize,
146 learnings: &[String],
147 next_story_id: Option<&str>,
148 ) {
149 let Some(ref bus) = self.bus else { return };
150 let prd_id = &self.state.prd.project;
151 let handle = bus.handle(format!("ralph.{}", story.id));
152
153 handle.publish_ralph_learning(
155 prd_id,
156 &story.id,
157 iteration,
158 learnings.to_vec(),
159 serde_json::json!({
160 "story_title": story.title,
161 "passed": story.passes,
162 }),
163 );
164
165 if let Some(next_id) = next_story_id {
167 handle.publish_ralph_handoff(
168 prd_id,
169 &story.id,
170 next_id,
171 serde_json::json!({ "learnings": learnings }),
172 &format!(
173 "{} {} (iteration {})",
174 story.id,
175 if story.passes { "passed" } else { "failed" },
176 iteration
177 ),
178 );
179 }
180
181 handle.publish_ralph_progress(
183 prd_id,
184 self.state.prd.passed_count(),
185 self.state.prd.user_stories.len(),
186 iteration,
187 &format!("{:?}", self.state.status),
188 );
189 }
190
191 fn bus_collect_learnings(&self) -> Vec<String> {
193 let Some(ref bus) = self.bus else {
194 return Vec::new();
195 };
196 let prd_id = &self.state.prd.project;
197 let mut handle = bus.handle("ralph-collector");
198 let envelopes = handle.drain_ralph_learnings(prd_id);
199 let mut learnings = Vec::new();
200 for env in envelopes {
201 match env.message {
202 crate::bus::BusMessage::RalphLearning {
203 story_id,
204 iteration,
205 learnings: items,
206 ..
207 } => {
208 for l in items {
209 learnings.push(format!("[{story_id} iter {iteration}] {l}"));
210 }
211 }
212 crate::bus::BusMessage::RalphHandoff {
213 from_story,
214 progress_summary,
215 ..
216 } => {
217 learnings.push(format!("[handoff from {from_story}] {progress_summary}"));
218 }
219 _ => {}
220 }
221 }
222 learnings
223 }
224
225 fn create_swarm_event_bridge(
229 ralph_tx: &mpsc::Sender<RalphEvent>,
230 story_id: String,
231 ) -> (mpsc::Sender<SwarmEvent>, tokio::task::JoinHandle<()>) {
232 let (swarm_tx, mut swarm_rx) = mpsc::channel::<SwarmEvent>(100);
233 let ralph_tx = ralph_tx.clone();
234 let handle = tokio::spawn(async move {
235 while let Some(event) = swarm_rx.recv().await {
236 let ralph_event = match event {
237 SwarmEvent::AgentToolCall { tool_name, .. } => RalphEvent::StoryToolCall {
238 story_id: story_id.clone(),
239 tool_name,
240 },
241 SwarmEvent::AgentToolCallDetail { detail, .. } => {
242 RalphEvent::StoryToolCallDetail {
243 story_id: story_id.clone(),
244 detail,
245 }
246 }
247 SwarmEvent::AgentMessage { entry, .. } => RalphEvent::StoryMessage {
248 story_id: story_id.clone(),
249 entry,
250 },
251 SwarmEvent::AgentOutput { output, .. } => RalphEvent::StoryOutput {
252 story_id: story_id.clone(),
253 output,
254 },
255 SwarmEvent::AgentError { error, .. } => RalphEvent::StoryError {
256 story_id: story_id.clone(),
257 error,
258 },
259 _ => continue, };
261 if ralph_tx.send(ralph_event).await.is_err() {
262 break;
263 }
264 }
265 });
266 (swarm_tx, handle)
267 }
268
269 fn build_story_infos(prd: &Prd) -> Vec<RalphStoryInfo> {
271 prd.user_stories
272 .iter()
273 .map(|s| RalphStoryInfo {
274 id: s.id.clone(),
275 title: s.title.clone(),
276 status: if s.passes {
277 RalphStoryStatus::Passed
278 } else {
279 RalphStoryStatus::Pending
280 },
281 priority: s.priority,
282 depends_on: s.depends_on.clone(),
283 quality_checks: Vec::new(),
284 tool_call_history: Vec::new(),
285 messages: Vec::new(),
286 output: None,
287 error: None,
288 merge_summary: None,
289 steps: 0,
290 current_tool: None,
291 })
292 .collect()
293 }
294
295 pub async fn run(&mut self) -> anyhow::Result<RalphState> {
297 self.state.status = RalphStatus::Running;
298
299 if let Some(ref store) = self.store {
301 let initial_state = RalphRunState {
302 run_id: self.run_id.clone(),
303 okr_id: None,
304 prd: self.state.prd.clone(),
305 config: self.config.clone(),
306 status: RalphStatus::Running,
307 current_iteration: 0,
308 max_iterations: self.state.max_iterations,
309 progress_log: Vec::new(),
310 story_results: Vec::new(),
311 error: None,
312 created_at: chrono::Utc::now().to_rfc3339(),
313 started_at: Some(chrono::Utc::now().to_rfc3339()),
314 completed_at: None,
315 };
316 let s = store.clone();
317 self.store_fire_and_forget(async move { s.create_run(&initial_state).await });
318 }
319
320 self.try_send_event(RalphEvent::Started {
322 project: self.state.prd.project.clone(),
323 feature: self.state.prd.feature.clone(),
324 stories: Self::build_story_infos(&self.state.prd),
325 max_iterations: self.state.max_iterations,
326 });
327
328 if !self.state.prd.branch_name.is_empty() {
330 info!("Switching to branch: {}", self.state.prd.branch_name);
331 self.git_checkout(&self.state.prd.branch_name)?;
332 }
333
334 if self.config.parallel_enabled {
336 self.run_parallel().await?;
337 } else {
338 self.run_sequential().await?;
339 }
340
341 if self.state.status != RalphStatus::Completed {
345 let passed = self.state.prd.passed_count();
346 let total = self.state.prd.user_stories.len();
347
348 if self.state.prd.is_complete() {
349 self.state.status = RalphStatus::Completed;
350 } else if self.state.current_iteration >= self.state.max_iterations {
351 if self.state.current_iteration > 0 && passed == 0 && total > 0 {
352 self.state.status = RalphStatus::QualityFailed;
354 warn!(
355 iterations = self.state.current_iteration,
356 passed = passed,
357 total = total,
358 "Ralph failed: all stories failed quality checks"
359 );
360 } else {
361 self.state.status = RalphStatus::MaxIterations;
363 info!(
364 iterations = self.state.current_iteration,
365 passed = passed,
366 total = total,
367 "Ralph finished with partial progress"
368 );
369 }
370 } else if self.state.current_iteration > 0 {
371 if passed == 0 && total > 0 {
374 self.state.status = RalphStatus::QualityFailed;
375 warn!(
376 iterations = self.state.current_iteration,
377 passed = passed,
378 total = total,
379 "Ralph ended with no passing stories"
380 );
381 } else {
382 self.state.status = RalphStatus::MaxIterations;
383 info!(
384 iterations = self.state.current_iteration,
385 passed = passed,
386 total = total,
387 "Ralph ended before full completion"
388 );
389 }
390 } else {
391 self.state.status = RalphStatus::MaxIterations;
392 }
393 }
394
395 if self.config.worktree_enabled {
397 let mgr = WorktreeManager::new(&self.state.working_dir);
398 match mgr.cleanup_all().await {
399 Ok(count) if count > 0 => {
400 info!(cleaned = count, "Cleaned up orphaned worktrees/branches");
401 }
402 Ok(_) => {}
403 Err(e) => {
404 warn!(error = %e, "Failed to cleanup orphaned worktrees");
405 }
406 }
407 }
408
409 if let Some(ref store) = self.store {
411 let s = store.clone();
412 let rid = self.run_id.clone();
413 let status = self.state.status;
414 let prd = self.state.prd.clone();
415 self.store_fire_and_forget(async move {
416 s.update_prd(&rid, &prd).await?;
417 s.complete_run(&rid, status).await
418 });
419 }
420
421 info!(
422 "Ralph finished: {:?}, {}/{} stories passed",
423 self.state.status,
424 self.state.prd.passed_count(),
425 self.state.prd.user_stories.len()
426 );
427
428 self.try_send_event(RalphEvent::Complete {
430 status: format!("{:?}", self.state.status),
431 passed: self.state.prd.passed_count(),
432 total: self.state.prd.user_stories.len(),
433 });
434
435 Ok(self.state.clone())
436 }
437
438 async fn run_sequential(&mut self) -> anyhow::Result<()> {
440 while self.state.current_iteration < self.state.max_iterations {
441 self.state.current_iteration += 1;
442
443 if let Some(ref store) = self.store {
445 let s = store.clone();
446 let rid = self.run_id.clone();
447 let iter = self.state.current_iteration;
448 self.store_fire_and_forget(async move { s.update_iteration(&rid, iter).await });
449 }
450
451 info!(
452 "=== Ralph iteration {} of {} ===",
453 self.state.current_iteration, self.state.max_iterations
454 );
455
456 self.try_send_event(RalphEvent::IterationStarted {
458 iteration: self.state.current_iteration,
459 max_iterations: self.state.max_iterations,
460 });
461
462 if self.state.prd.is_complete() {
464 info!("All stories complete!");
465 self.state.status = RalphStatus::Completed;
466 break;
467 }
468
469 let story = match self.state.prd.next_story() {
471 Some(s) => s.clone(),
472 None => {
473 warn!("No available stories (dependencies not met)");
474 break;
475 }
476 };
477
478 info!("Working on story: {} - {}", story.id, story.title);
479
480 self.try_send_event(RalphEvent::StoryStarted {
482 story_id: story.id.clone(),
483 });
484
485 let bus_learnings = self.bus_collect_learnings();
487
488 let prompt = if bus_learnings.is_empty() {
490 self.build_prompt(&story)
491 } else {
492 let mut p = self.build_prompt(&story);
493 p.push_str("\n## Learnings from Previous Iterations:\n");
494 for l in &bus_learnings {
495 p.push_str(&format!("- {l}\n"));
496 }
497 p
498 };
499
500 match self.call_llm(&story.id, &prompt).await {
502 Ok(response) => {
503 let entry = ProgressEntry {
505 story_id: story.id.clone(),
506 iteration: self.state.current_iteration,
507 status: "completed".to_string(),
508 learnings: self.extract_learnings(&response),
509 files_changed: Vec::new(),
510 timestamp: chrono::Utc::now().to_rfc3339(),
511 };
512 self.append_progress(&entry, &response)?;
513 let entry_learnings = entry.learnings.clone();
514 self.state.progress_log.push(entry);
515
516 if self.config.quality_checks_enabled {
518 if self.run_quality_gates_with_events(&story.id).await? {
519 info!("Story {} passed quality checks!", story.id);
520 self.state.prd.mark_passed(&story.id);
521
522 if let Some(ref store) = self.store {
524 let s = store.clone();
525 let rid = self.run_id.clone();
526 let entry = StoryResultEntry {
527 story_id: story.id.clone(),
528 title: story.title.clone(),
529 passed: true,
530 iteration: self.state.current_iteration,
531 error: None,
532 };
533 self.store_fire_and_forget(async move {
534 s.record_story_result(&rid, &entry).await
535 });
536 }
537
538 self.try_send_event(RalphEvent::StoryComplete {
539 story_id: story.id.clone(),
540 passed: true,
541 });
542
543 let next = self.state.prd.next_story().map(|s| s.id.clone());
545 self.bus_publish_story_result(
546 &story,
547 self.state.current_iteration,
548 &entry_learnings,
549 next.as_deref(),
550 );
551
552 if self.config.auto_commit {
554 self.commit_story(&story)?;
555 }
556
557 self.state.prd.save(&self.state.prd_path).await?;
559 } else {
560 warn!("Story {} failed quality checks", story.id);
561
562 if let Some(ref store) = self.store {
564 let s = store.clone();
565 let rid = self.run_id.clone();
566 let entry = StoryResultEntry {
567 story_id: story.id.clone(),
568 title: story.title.clone(),
569 passed: false,
570 iteration: self.state.current_iteration,
571 error: Some("Quality checks failed".to_string()),
572 };
573 self.store_fire_and_forget(async move {
574 s.record_story_result(&rid, &entry).await
575 });
576 }
577
578 let next = self.state.prd.next_story().map(|s| s.id.clone());
580 self.bus_publish_story_result(
581 &story,
582 self.state.current_iteration,
583 &entry_learnings,
584 next.as_deref(),
585 );
586
587 self.try_send_event(RalphEvent::StoryComplete {
588 story_id: story.id.clone(),
589 passed: false,
590 });
591 }
592 } else {
593 self.state.prd.mark_passed(&story.id);
595 self.state.prd.save(&self.state.prd_path).await?;
596
597 if let Some(ref store) = self.store {
599 let s = store.clone();
600 let rid = self.run_id.clone();
601 let entry = StoryResultEntry {
602 story_id: story.id.clone(),
603 title: story.title.clone(),
604 passed: true,
605 iteration: self.state.current_iteration,
606 error: None,
607 };
608 self.store_fire_and_forget(async move {
609 s.record_story_result(&rid, &entry).await
610 });
611 }
612
613 self.try_send_event(RalphEvent::StoryComplete {
614 story_id: story.id.clone(),
615 passed: true,
616 });
617 }
618 }
619 Err(e) => {
620 warn!("LLM call failed: {}", e);
621 let entry = ProgressEntry {
622 story_id: story.id.clone(),
623 iteration: self.state.current_iteration,
624 status: format!("failed: {}", e),
625 learnings: Vec::new(),
626 files_changed: Vec::new(),
627 timestamp: chrono::Utc::now().to_rfc3339(),
628 };
629 self.state.progress_log.push(entry);
630
631 self.try_send_event(RalphEvent::StoryError {
632 story_id: story.id.clone(),
633 error: format!("{}", e),
634 });
635 }
636 }
637 }
638
639 Ok(())
640 }
641
642 async fn run_parallel(&mut self) -> anyhow::Result<()> {
644 let stages: Vec<Vec<UserStory>> = self
646 .state
647 .prd
648 .stages()
649 .into_iter()
650 .map(|stage| stage.into_iter().cloned().collect())
651 .collect();
652 let total_stages = stages.len();
653
654 info!(
655 "Parallel execution: {} stages, {} max concurrent stories",
656 total_stages, self.config.max_concurrent_stories
657 );
658
659 let worktree_mgr = if self.config.worktree_enabled {
661 let mgr = WorktreeManager::new(&self.state.working_dir);
662 info!("Worktree isolation enabled for parallel stories");
663 Some(Arc::new(mgr))
664 } else {
665 None
666 };
667
668 for (stage_idx, stage_stories) in stages.into_iter().enumerate() {
669 if self.state.prd.is_complete() {
670 info!("All stories complete!");
671 self.state.status = RalphStatus::Completed;
672 break;
673 }
674
675 if self.state.current_iteration >= self.state.max_iterations {
676 break;
677 }
678
679 let story_count = stage_stories.len();
680 info!(
681 "=== Stage {}/{}: {} stories in parallel ===",
682 stage_idx + 1,
683 total_stages,
684 story_count
685 );
686
687 let stories: Vec<UserStory> = stage_stories;
689
690 let semaphore = Arc::new(tokio::sync::Semaphore::new(
692 self.config.max_concurrent_stories,
693 ));
694 let provider = Arc::clone(&self.provider);
695 let model = self.model.clone();
696 let prd_info = (
697 self.state.prd.project.clone(),
698 self.state.prd.feature.clone(),
699 );
700 let working_dir = self.state.working_dir.clone();
701 let progress_path = self.config.progress_path.clone();
702
703 let mut handles = Vec::new();
704
705 let accumulated_learnings = self.bus_collect_learnings();
707
708 for story in stories {
709 let sem = Arc::clone(&semaphore);
710 let provider = Arc::clone(&provider);
711 let model = model.clone();
712 let prd_info = prd_info.clone();
713 let working_dir = working_dir.clone();
714 let worktree_mgr = worktree_mgr.clone();
715 let progress_path = progress_path.clone();
716 let ralph_tx = self.event_tx.clone();
717 let stage_learnings = accumulated_learnings.clone();
718 let bus = self.bus.clone();
719 let relay_enabled = self.config.relay_enabled;
720 let relay_max_agents = self.config.relay_max_agents;
721 let relay_max_rounds = self.config.relay_max_rounds;
722 let registry = self.registry.clone();
723 let max_steps_per_story = self.config.max_steps_per_story;
724 let max_quality_retries = self.config.max_quality_retries;
725 let max_rate_limit_retries = self.config.max_rate_limit_retries;
726 let rate_limit_base_delay_ms = self.config.rate_limit_base_delay_ms;
727 let quality_checks_enabled = self.config.quality_checks_enabled;
728 let quality_checks = self.state.prd.quality_checks.clone();
729
730 let handle: tokio::task::JoinHandle<(
731 crate::ralph::types::UserStory,
732 bool,
733 crate::ralph::types::ProgressEntry,
734 Option<crate::worktree::WorktreeInfo>,
735 Option<std::sync::Arc<crate::worktree::WorktreeManager>>,
736 bool, )> = tokio::spawn(async move {
738 let _permit = sem.acquire().await.expect("semaphore closed");
739
740 let (story_working_dir, worktree_info) = if let Some(ref mgr) = worktree_mgr {
742 match mgr.create(&story.id.to_lowercase().replace("-", "_")).await {
743 Ok(wt) => {
744 if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
746 warn!(
747 story_id = %story.id,
748 error = %e,
749 "Failed to inject workspace stub"
750 );
751 }
752 info!(
753 story_id = %story.id,
754 worktree_path = %wt.path.display(),
755 "Created worktree for story"
756 );
757 (wt.path.clone(), Some(wt))
758 }
759 Err(e) => {
760 warn!(
761 story_id = %story.id,
762 error = %e,
763 "Failed to create worktree, using main directory"
764 );
765 (working_dir.clone(), None)
766 }
767 }
768 } else {
769 (working_dir.clone(), None)
770 };
771
772 info!(
773 "Working on story: {} - {} (in {:?})",
774 story.id, story.title, story_working_dir
775 );
776
777 if let Some(ref tx) = ralph_tx {
779 let _ = tx
780 .send(RalphEvent::StoryStarted {
781 story_id: story.id.clone(),
782 })
783 .await;
784 }
785
786 let mut prompt =
788 Self::build_story_prompt(&story, &prd_info, &story_working_dir);
789 if !stage_learnings.is_empty() {
790 prompt.push_str("\n## Learnings from Previous Stages:\n");
791 for l in &stage_learnings {
792 prompt.push_str(&format!("- {l}\n"));
793 }
794 }
795
796 let (bridge_tx, _bridge_handle) = if !relay_enabled {
798 if let Some(ref tx) = ralph_tx {
799 let (btx, handle) =
800 Self::create_swarm_event_bridge(tx, story.id.clone());
801 (Some(btx), Some(handle))
802 } else {
803 (None, None)
804 }
805 } else {
806 (None, None)
807 };
808
809 let call_result = {
811 let mut last_err = None;
812 let mut attempt_result = None;
813 for attempt in 0..=max_rate_limit_retries {
814 let res = if relay_enabled {
815 if let Some(ref reg) = registry {
816 Self::call_relay_static(
817 reg,
818 &model,
819 &prompt,
820 &story_working_dir,
821 ralph_tx.clone(),
822 story.id.clone(),
823 bus.clone(),
824 relay_max_agents,
825 relay_max_rounds,
826 )
827 .await
828 } else {
829 warn!(
830 story_id = %story.id,
831 "Relay enabled but no registry available, using single agent"
832 );
833 Self::call_llm_static(
834 &provider,
835 &model,
836 &prompt,
837 &story_working_dir,
838 bridge_tx.clone(),
839 story.id.clone(),
840 bus.clone(),
841 max_steps_per_story,
842 )
843 .await
844 }
845 } else {
846 Self::call_llm_static(
847 &provider,
848 &model,
849 &prompt,
850 &story_working_dir,
851 bridge_tx.clone(),
852 story.id.clone(),
853 bus.clone(),
854 max_steps_per_story,
855 )
856 .await
857 };
858 match res {
859 Ok(output) => {
860 attempt_result = Some(Ok(output));
861 break;
862 }
863 Err(e) => {
864 let err_str = format!("{e}");
865 let is_rate_limit = err_str.contains("Rate limit")
866 || err_str.contains("rate_limit")
867 || err_str.contains("429")
868 || err_str.contains("too many requests");
869 if is_rate_limit && attempt < max_rate_limit_retries {
870 let delay_ms = rate_limit_base_delay_ms
871 * 2u64.saturating_pow(attempt as u32);
872 warn!(
873 story_id = %story.id,
874 attempt = attempt + 1,
875 max_retries = max_rate_limit_retries,
876 delay_ms = delay_ms,
877 "Rate limited, backing off before retry"
878 );
879 tokio::time::sleep(std::time::Duration::from_millis(
880 delay_ms,
881 ))
882 .await;
883 } else {
884 last_err = Some(e);
885 break;
886 }
887 }
888 }
889 }
890 attempt_result.unwrap_or_else(|| {
891 Err(last_err
892 .unwrap_or_else(|| anyhow::anyhow!("All retries exhausted")))
893 })
894 };
895
896 let (final_result, quality_passed) = match call_result {
899 Ok(initial_output) => {
900 let mut output = initial_output;
901 let mut qg_passed = !quality_checks_enabled; if quality_checks_enabled {
904 for qg_attempt in 0..=max_quality_retries {
905 let cargo_root = Self::find_cargo_root(&story_working_dir);
906 let checks = &quality_checks;
907 let mut all_ok = true;
908 let mut error_output = String::new();
909
910 for (name, cmd) in [
911 ("typecheck", &checks.typecheck),
912 ("lint", &checks.lint),
913 ("test", &checks.test),
914 ("build", &checks.build),
915 ] {
916 if let Some(command) = cmd {
917 let cmd_output = std::process::Command::new("/bin/sh")
918 .arg("-c")
919 .arg(command)
920 .current_dir(&cargo_root)
921 .output();
922 match cmd_output {
923 Ok(o) if o.status.success() => {}
924 Ok(o) => {
925 let stderr = String::from_utf8_lossy(&o.stderr);
926 let stdout = String::from_utf8_lossy(&o.stdout);
927 let combined = format!("{stdout}\n{stderr}");
928 let errors: String = combined
930 .lines()
931 .filter(|l| {
932 l.starts_with("error")
933 || l.contains("error:")
934 || l.contains("error[")
935 })
936 .take(20)
937 .collect::<Vec<_>>()
938 .join("\n");
939 error_output = format!(
940 "Quality check `{name}` failed:\n{errors}"
941 );
942 all_ok = false;
943 break;
944 }
945 Err(e) => {
946 error_output = format!(
947 "Quality check `{name}` could not run: {e}"
948 );
949 all_ok = false;
950 break;
951 }
952 }
953 }
954 }
955
956 if all_ok {
957 qg_passed = true;
958 info!(
959 story_id = %story.id,
960 attempt = qg_attempt + 1,
961 "Quality gates passed"
962 );
963 break;
964 }
965
966 if qg_attempt < max_quality_retries {
967 warn!(
969 story_id = %story.id,
970 attempt = qg_attempt + 1,
971 max_retries = max_quality_retries,
972 "Quality gates failed, spawning repair agent"
973 );
974
975 let repair_prompt = format!(
976 "# QUALITY GATE REPAIR (attempt {}/{})\n\n\
977 The previous implementation attempt for story {} had quality gate failures.\n\n\
978 ## Errors:\n```\n{}\n```\n\n\
979 ## Instructions:\n\
980 1. Read the files mentioned in the errors\n\
981 2. Fix ONLY the compile/lint errors shown above\n\
982 3. Run `cargo check 2>&1` to verify your fix\n\
983 4. Do NOT refactor or change unrelated code\n\n\
984 Working directory: {}",
985 qg_attempt + 1,
986 max_quality_retries,
987 story.id,
988 error_output,
989 story_working_dir.display()
990 );
991
992 let repair_result = Self::call_llm_static(
994 &provider,
995 &model,
996 &repair_prompt,
997 &story_working_dir,
998 bridge_tx.clone(),
999 story.id.clone(),
1000 bus.clone(),
1001 20, )
1003 .await;
1004
1005 match repair_result {
1006 Ok(repair_output) => {
1007 output = format!(
1008 "{output}\n---\n[repair attempt {}] {repair_output}",
1009 qg_attempt + 1
1010 );
1011 }
1012 Err(e) => {
1013 warn!(
1014 story_id = %story.id,
1015 error = %e,
1016 "Repair agent failed"
1017 );
1018 break;
1019 }
1020 }
1021 } else {
1022 warn!(
1023 story_id = %story.id,
1024 "Quality gates still failing after {} retries",
1025 max_quality_retries
1026 );
1027 }
1028 }
1029 }
1030
1031 (Ok(output), qg_passed)
1032 }
1033 Err(e) => (Err(e), false),
1034 };
1035
1036 let entry = match &final_result {
1037 Ok(response) => {
1038 let progress_file = story_working_dir.join(&progress_path);
1040 let _ = std::fs::write(&progress_file, response);
1041
1042 ProgressEntry {
1043 story_id: story.id.clone(),
1044 iteration: 1,
1045 status: "completed".to_string(),
1046 learnings: Self::extract_learnings_static(response),
1047 files_changed: Vec::new(),
1048 timestamp: chrono::Utc::now().to_rfc3339(),
1049 }
1050 }
1051 Err(e) => {
1052 warn!("LLM call failed for story {}: {}", story.id, e);
1053 ProgressEntry {
1054 story_id: story.id.clone(),
1055 iteration: 1,
1056 status: format!("failed: {}", e),
1057 learnings: Vec::new(),
1058 files_changed: Vec::new(),
1059 timestamp: chrono::Utc::now().to_rfc3339(),
1060 }
1061 }
1062 };
1063
1064 (
1065 story,
1066 final_result.is_ok(),
1067 entry,
1068 worktree_info,
1069 worktree_mgr,
1070 quality_passed,
1071 )
1072 });
1073
1074 handles.push(handle);
1075 }
1076
1077 let _stage_passed = 0usize;
1079 for handle in handles {
1080 match handle.await {
1081 Ok((story, success, entry, worktree_info, worktree_mgr, quality_passed)) => {
1082 self.state.progress_log.push(entry);
1083
1084 if success && quality_passed {
1085 {
1088 info!("Story {} passed quality checks!", story.id);
1089
1090 if let Some(ref wt) = worktree_info {
1092 let _ = self.commit_in_dir(&wt.path, &story);
1093 }
1094
1095 if let (Some(wt), Some(mgr)) =
1097 (worktree_info.as_ref(), worktree_mgr.as_ref())
1098 {
1099 match mgr.merge(&wt.name).await {
1100 Ok(merge_result) => {
1101 if merge_result.success {
1102 info!(
1103 story_id = %story.id,
1104 files_changed = merge_result.files_changed,
1105 "Merged story changes successfully"
1106 );
1107 self.state.prd.mark_passed(&story.id);
1108 self.try_send_event(RalphEvent::StoryMerge {
1109 story_id: story.id.clone(),
1110 success: true,
1111 summary: merge_result.summary.clone(),
1112 });
1113 self.try_send_event(RalphEvent::StoryComplete {
1114 story_id: story.id.clone(),
1115 passed: true,
1116 });
1117
1118 if let Some(ref store) = self.store {
1120 let s = store.clone();
1121 let rid = self.run_id.clone();
1122 let entry = StoryResultEntry {
1123 story_id: story.id.clone(),
1124 title: story.title.clone(),
1125 passed: true,
1126 iteration: self.state.current_iteration,
1127 error: None,
1128 };
1129 self.store_fire_and_forget(async move {
1130 s.record_story_result(&rid, &entry).await
1131 });
1132 }
1133
1134 let _ = mgr.cleanup(&wt.name).await;
1136 } else if !merge_result.conflicts.is_empty() {
1137 info!(
1139 story_id = %story.id,
1140 num_conflicts = merge_result.conflicts.len(),
1141 "Spawning conflict resolver sub-agent"
1142 );
1143
1144 match Self::resolve_conflicts_static(
1146 &provider,
1147 &model,
1148 &working_dir,
1149 &story,
1150 &merge_result.conflicts,
1151 &merge_result.conflict_diffs,
1152 self.bus.clone(),
1153 )
1154 .await
1155 {
1156 Ok(resolved) => {
1157 if resolved {
1158 let commit_msg = format!(
1160 "Merge: resolved conflicts for {}",
1161 story.id
1162 );
1163 match mgr
1164 .complete_merge(
1165 &wt.name,
1166 &commit_msg,
1167 )
1168 .await
1169 {
1170 Ok(final_result) => {
1171 if final_result.success {
1172 info!(
1173 story_id = %story.id,
1174 "Merge completed after conflict resolution"
1175 );
1176 self.state
1177 .prd
1178 .mark_passed(&story.id);
1179 } else {
1180 warn!(
1181 story_id = %story.id,
1182 "Merge failed even after resolution"
1183 );
1184 let _ = mgr
1185 .abort_merge(&wt.name)
1186 .await;
1187 }
1188 }
1189 Err(e) => {
1190 warn!(
1191 story_id = %story.id,
1192 error = %e,
1193 "Failed to complete merge after resolution"
1194 );
1195 let _ = mgr
1196 .abort_merge(&wt.name)
1197 .await;
1198 }
1199 }
1200 } else {
1201 warn!(
1202 story_id = %story.id,
1203 "Conflict resolver could not resolve all conflicts"
1204 );
1205 let _ = mgr.abort_merge(&wt.name).await;
1206 }
1207 }
1208 Err(e) => {
1209 warn!(
1210 story_id = %story.id,
1211 error = %e,
1212 "Conflict resolver failed"
1213 );
1214 let _ = mgr.abort_merge(&wt.name).await;
1215 }
1216 }
1217 let _ = mgr.cleanup(&wt.name).await;
1219 } else if merge_result.aborted {
1220 warn!(
1222 story_id = %story.id,
1223 summary = %merge_result.summary,
1224 "Merge was aborted due to non-conflict failure"
1225 );
1226 let _ = mgr.cleanup(&wt.name).await;
1228 } else {
1229 warn!(
1231 story_id = %story.id,
1232 summary = %merge_result.summary,
1233 "Merge failed but not aborted - manual intervention may be needed"
1234 );
1235 }
1237 }
1238 Err(e) => {
1239 warn!(
1240 story_id = %story.id,
1241 error = %e,
1242 "Failed to merge worktree"
1243 );
1244 }
1245 }
1246 } else {
1247 self.state.prd.mark_passed(&story.id);
1249 self.try_send_event(RalphEvent::StoryComplete {
1250 story_id: story.id.clone(),
1251 passed: true,
1252 });
1253
1254 if let Some(ref store) = self.store {
1256 let s = store.clone();
1257 let rid = self.run_id.clone();
1258 let entry = StoryResultEntry {
1259 story_id: story.id.clone(),
1260 title: story.title.clone(),
1261 passed: true,
1262 iteration: self.state.current_iteration,
1263 error: None,
1264 };
1265 self.store_fire_and_forget(async move {
1266 s.record_story_result(&rid, &entry).await
1267 });
1268 }
1269 }
1270 }
1271 } else {
1272 self.try_send_event(RalphEvent::StoryError {
1274 story_id: story.id.clone(),
1275 error: "LLM call failed".to_string(),
1276 });
1277 if let Some(ref wt) = worktree_info {
1278 info!(
1279 story_id = %story.id,
1280 worktree_path = %wt.path.display(),
1281 "Keeping worktree for debugging (story failed)"
1282 );
1283 }
1284 }
1285 }
1286 Err(e) => {
1287 warn!("Story execution task failed: {}", e);
1288 }
1289 }
1290 }
1291
1292 self.state.current_iteration += 1;
1294
1295 if self.bus.is_some() {
1297 let bus_learnings: Vec<String> = self
1298 .state
1299 .progress_log
1300 .iter()
1301 .flat_map(|e| e.learnings.clone())
1302 .collect();
1303 for story in &self.state.prd.user_stories {
1304 if story.passes {
1305 self.bus_publish_story_result(
1306 story,
1307 self.state.current_iteration,
1308 &bus_learnings,
1309 None,
1310 );
1311 }
1312 }
1313 }
1314
1315 self.state.prd.save(&self.state.prd_path).await?;
1317
1318 if let Some(ref store) = self.store {
1320 let s = store.clone();
1321 let rid = self.run_id.clone();
1322 let prd = self.state.prd.clone();
1323 let iter = self.state.current_iteration;
1324 self.store_fire_and_forget(async move {
1325 s.update_prd(&rid, &prd).await?;
1326 s.update_iteration(&rid, iter).await
1327 });
1328 }
1329 }
1330
1331 Ok(())
1332 }
1333
1334 fn build_story_prompt(
1336 story: &UserStory,
1337 prd_info: &(String, String),
1338 working_dir: &Path,
1339 ) -> String {
1340 let wd = working_dir.display();
1341 format!(
1342 r#"# PRD: {} - {}
1343
1344## Working Directory: {wd}
1345
1346## Current Story: {} - {}
1347
1348{}
1349
1350### Acceptance Criteria:
1351{}
1352
1353## WORKFLOW (follow this exactly):
1354
13551. **EXPLORE** (2-4 tool calls): Use `glob` and `read` to understand existing code
13562. **IMPLEMENT** (5-15 tool calls): Use `write` or `edit` to make changes
13573. **VERIFY**: Run a verification command appropriate to the language (see below)
13584. **FIX OR FINISH**:
1359 - If no errors: Output `STORY_COMPLETE: {}` and STOP
1360 - If errors: Parse the error, fix it, re-verify (max 3 fix attempts)
1361 - After 3 failed attempts: Output `STORY_BLOCKED: <error summary>` and STOP
1362
1363## VERIFICATION BY LANGUAGE:
1364- **Rust**: `bash` with `cargo check 2>&1`
1365- **Python**: `bash` with `python -c "import ast; ast.parse(open('FILE').read())"` for each changed file
1366- **TypeScript/JavaScript**: `bash` with `npx tsc --noEmit` (if tsconfig.json exists)
1367- Choose the right check for the files you modified
1368
1369## TOOL USAGE:
1370- `read`: Read file content (always read before editing!)
1371- `edit`: Modify files (MUST include 3+ lines before/after for unique context)
1372- `write`: Create new files
1373- `bash`: Run commands with `{{"command": "...", "cwd": "{wd}"}}`
1374
1375## CRITICAL RULES:
1376- ALWAYS read a file before editing it
1377- When edit fails with "ambiguous match", include MORE context lines
1378- Do NOT add TODO/placeholder comments
1379- Count your fix attempts - STOP after 3 failures
1380
1381## TERMINATION:
1382SUCCESS: Output `STORY_COMPLETE: {}`
1383BLOCKED: Output `STORY_BLOCKED: <brief error description>`
1384
1385Do NOT keep iterating indefinitely. Stop when done or blocked.
1386"#,
1387 prd_info.0,
1388 prd_info.1,
1389 story.id,
1390 story.title,
1391 story.description,
1392 story
1393 .acceptance_criteria
1394 .iter()
1395 .map(|c| format!("- {}", c))
1396 .collect::<Vec<_>>()
1397 .join("\n"),
1398 story.id,
1399 story.id
1400 )
1401 }
1402
1403 async fn call_llm_static(
1405 provider: &Arc<dyn Provider>,
1406 model: &str,
1407 prompt: &str,
1408 working_dir: &PathBuf,
1409 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1410 story_id: String,
1411 bus: Option<Arc<AgentBus>>,
1412 max_steps_per_story: usize,
1413 ) -> anyhow::Result<String> {
1414 let system_prompt = crate::agent::builtin::build_system_prompt(working_dir);
1416
1417 let tool_registry =
1419 ToolRegistry::with_provider_arc(Arc::clone(provider), model.to_string());
1420
1421 let tool_definitions: Vec<_> = tool_registry
1423 .definitions()
1424 .into_iter()
1425 .filter(|t| t.name != "question")
1426 .collect();
1427
1428 info!(
1429 "Ralph sub-agent starting with {} tools in {:?}",
1430 tool_definitions.len(),
1431 working_dir
1432 );
1433
1434 let max_steps = max_steps_per_story;
1436 let (output, steps, tool_calls, _exit_reason) = run_agent_loop(
1437 Arc::clone(provider),
1438 model,
1439 &system_prompt,
1440 prompt,
1441 tool_definitions,
1442 tool_registry, max_steps,
1444 180, event_tx,
1446 story_id,
1447 bus.clone(),
1448 Some(working_dir.clone()),
1449 )
1450 .await?;
1451
1452 info!(
1453 "Ralph sub-agent completed: {} steps, {} tool calls",
1454 steps, tool_calls
1455 );
1456
1457 Ok(output)
1458 }
1459
1460 fn build_implementation_relay_profiles(
1462 max_agents: usize,
1463 working_dir: &std::path::Path,
1464 ) -> Vec<(String, String, Vec<String>)> {
1465 let wd = working_dir.display();
1466 let mut profiles = vec![
1467 (
1468 "auto-planner".to_string(),
1469 format!(
1470 "You are @auto-planner.\n\
1471 Specialty: Code analysis and implementation planning.\n\
1472 Mission: Read the codebase to understand existing patterns, then produce a concrete step-by-step implementation plan.\n\
1473 Working directory: {wd}\n\n\
1474 Read existing code first using the `read` tool. Identify all files that need changes.\n\
1475 Produce a numbered list of specific changes with file paths and code snippets.\n\
1476 Pass your plan in a clear format the next agent can follow step by step."
1477 ),
1478 vec!["planning".into(), "analysis".into(), "relay".into()],
1479 ),
1480 (
1481 "auto-coder".to_string(),
1482 format!(
1483 "You are @auto-coder.\n\
1484 Specialty: Code implementation.\n\
1485 Mission: Implement changes according to the plan, write production code, verify it compiles.\n\
1486 Working directory: {wd}\n\n\
1487 Follow the plan from the incoming handoff. Use `edit` and `write` tools to make changes.\n\
1488 After implementing, run `bash` with `cargo check 2>&1` (cwd: {wd}) to verify.\n\
1489 Fix any compilation errors. Report what you implemented."
1490 ),
1491 vec!["implementation".into(), "coding".into(), "relay".into()],
1492 ),
1493 (
1494 "auto-reviewer".to_string(),
1495 format!(
1496 "You are @auto-reviewer.\n\
1497 Specialty: Code review and quality verification.\n\
1498 Mission: Review implemented changes, run quality checks, fix remaining issues.\n\
1499 Working directory: {wd}\n\n\
1500 Review the code changes from the incoming handoff. Run `bash` with `cargo check 2>&1` (cwd: {wd}).\n\
1501 Fix any remaining issues. Verify the implementation meets the acceptance criteria.\n\
1502 If complete, include STORY_COMPLETE in your output. If blocked, include STORY_BLOCKED."
1503 ),
1504 vec!["review".into(), "verification".into(), "relay".into()],
1505 ),
1506 ];
1507
1508 if max_agents >= 4 {
1509 profiles.push((
1510 "auto-tester".to_string(),
1511 format!(
1512 "You are @auto-tester.\n\
1513 Specialty: Test writing and verification.\n\
1514 Mission: Write tests for the implemented changes and ensure they pass.\n\
1515 Working directory: {wd}\n\n\
1516 Review the implementation, write appropriate tests, run them with `bash`.\n\
1517 Report test results and coverage gaps."
1518 ),
1519 vec!["testing".into(), "relay".into()],
1520 ));
1521 }
1522
1523 if max_agents >= 5 {
1524 profiles.push((
1525 "auto-refactorer".to_string(),
1526 format!(
1527 "You are @auto-refactorer.\n\
1528 Specialty: Code quality and lint compliance.\n\
1529 Mission: Run clippy, fix warnings, improve code quality without changing behavior.\n\
1530 Working directory: {wd}\n\n\
1531 Run `bash` with `cargo clippy --all-features 2>&1` (cwd: {wd}).\n\
1532 Fix warnings and improve naming, structure, error handling."
1533 ),
1534 vec!["quality".into(), "relay".into()],
1535 ));
1536 }
1537
1538 profiles.truncate(max_agents);
1539 profiles
1540 }
1541
1542 fn normalize_for_relay(text: &str) -> String {
1544 let mut normalized = String::with_capacity(text.len().min(512));
1545 let mut last_was_space = false;
1546 for ch in text.chars() {
1547 if ch.is_ascii_alphanumeric() {
1548 normalized.push(ch.to_ascii_lowercase());
1549 last_was_space = false;
1550 } else if ch.is_whitespace() && !last_was_space {
1551 normalized.push(' ');
1552 last_was_space = true;
1553 }
1554 if normalized.len() >= 280 {
1555 break;
1556 }
1557 }
1558 normalized.trim().to_string()
1559 }
1560
1561 fn prepare_relay_handoff(from_agent: &str, output: &str, original_task: &str) -> String {
1563 let max_len = 6_000;
1564 let relay_payload = if output.len() > max_len {
1565 let truncated: String = output.chars().take(max_len).collect();
1566 format!("{truncated}\n... (truncated)")
1567 } else {
1568 output.to_string()
1569 };
1570
1571 format!(
1572 "Original task:\n{original_task}\n\n\
1573 Incoming handoff from @{from_agent}:\n{relay_payload}\n\n\
1574 Continue the work from this handoff. Keep your response focused and provide concrete next steps."
1575 )
1576 }
1577
1578 async fn call_relay_static(
1585 registry: &Arc<ProviderRegistry>,
1586 model: &str,
1587 prompt: &str,
1588 working_dir: &Path,
1589 ralph_tx: Option<mpsc::Sender<RalphEvent>>,
1590 story_id: String,
1591 bus: Option<Arc<AgentBus>>,
1592 max_agents: usize,
1593 max_rounds: usize,
1594 ) -> anyhow::Result<String> {
1595 let max_agents = max_agents.clamp(2, 8);
1596 let max_rounds = max_rounds.clamp(1, 5);
1597
1598 let profiles = Self::build_implementation_relay_profiles(max_agents, working_dir);
1599
1600 let relay_bus = bus.unwrap_or_else(|| Arc::new(AgentBus::new()));
1602 let relay = ProtocolRelayRuntime::new(relay_bus.clone());
1603
1604 let mut sessions: HashMap<String, Session> = HashMap::new();
1605 let mut ordered_agents: Vec<String> = Vec::new();
1606 let mut relay_profiles: Vec<RelayAgentProfile> = Vec::new();
1607
1608 for (name, instructions, capabilities) in &profiles {
1609 let mut session = Session::new().await?;
1610 session.metadata.model = Some(model.to_string());
1611 session.set_agent_name(name.clone());
1612 session.bus = Some(relay_bus.clone());
1613 session.add_message(Message {
1614 role: Role::System,
1615 content: vec![ContentPart::Text {
1616 text: instructions.clone(),
1617 }],
1618 });
1619
1620 relay_profiles.push(RelayAgentProfile {
1621 name: name.clone(),
1622 capabilities: capabilities.clone(),
1623 });
1624 ordered_agents.push(name.clone());
1625 sessions.insert(name.clone(), session);
1626 }
1627
1628 relay.register_agents(&relay_profiles);
1629
1630 info!(
1631 story_id = %story_id,
1632 agents = ordered_agents.len(),
1633 rounds = max_rounds,
1634 "Starting relay team for story"
1635 );
1636
1637 let mut baton = prompt.to_string();
1638 let mut previous_normalized: Option<String> = None;
1639 let mut convergence_hits = 0usize;
1640 let mut turns = 0usize;
1641
1642 'relay_loop: for round in 1..=max_rounds {
1643 for idx in 0..ordered_agents.len() {
1644 let to = ordered_agents[idx].clone();
1645 let from = if idx == 0 {
1646 if round == 1 {
1647 "user".to_string()
1648 } else {
1649 ordered_agents[ordered_agents.len() - 1].clone()
1650 }
1651 } else {
1652 ordered_agents[idx - 1].clone()
1653 };
1654
1655 turns += 1;
1656 relay.send_handoff(&from, &to, &baton);
1657
1658 if let Some(ref tx) = ralph_tx {
1659 let _ = tx
1660 .send(RalphEvent::StoryToolCall {
1661 story_id: story_id.clone(),
1662 tool_name: format!(
1663 "relay: @{from} → @{to} (round {round}/{max_rounds})"
1664 ),
1665 })
1666 .await;
1667 }
1668
1669 let Some(mut session) = sessions.remove(&to) else {
1670 anyhow::bail!("Relay agent @{to} session unavailable for story {story_id}");
1671 };
1672
1673 let (event_tx, mut event_rx) = mpsc::channel::<SessionEvent>(256);
1674 let registry_clone = registry.clone();
1675 let baton_clone = baton.clone();
1676
1677 let join = tokio::spawn(async move {
1678 let result = session
1679 .prompt_with_events(&baton_clone, event_tx, registry_clone)
1680 .await;
1681 (session, result)
1682 });
1683
1684 while !join.is_finished() {
1686 while let Ok(event) = event_rx.try_recv() {
1687 if let Some(ref tx) = ralph_tx
1688 && let SessionEvent::ToolCallStart { ref name, .. } = event
1689 {
1690 let _ = tx
1691 .send(RalphEvent::StoryToolCall {
1692 story_id: story_id.clone(),
1693 tool_name: format!("@{to}: {name}"),
1694 })
1695 .await;
1696 }
1697 }
1698 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1699 }
1700
1701 let (updated_session, result) = join
1702 .await
1703 .map_err(|e| anyhow::anyhow!("Relay agent @{to} task error: {e}"))?;
1704
1705 while event_rx.try_recv().is_ok() {}
1707
1708 sessions.insert(to.clone(), updated_session);
1709
1710 let output = result
1711 .map_err(|e| {
1712 anyhow::anyhow!("Relay agent @{to} failed on story {story_id}: {e}")
1713 })?
1714 .text;
1715
1716 let normalized = Self::normalize_for_relay(&output);
1718 if previous_normalized.as_deref() == Some(normalized.as_str()) {
1719 convergence_hits += 1;
1720 } else {
1721 convergence_hits = 0;
1722 }
1723 previous_normalized = Some(normalized);
1724
1725 if convergence_hits >= 2 {
1726 info!(story_id = %story_id, turns, "Relay converged");
1727 baton = output;
1728 break 'relay_loop;
1729 }
1730
1731 if output.contains("STORY_COMPLETE") || output.contains("STORY_BLOCKED") {
1733 info!(story_id = %story_id, turns, "Story reached terminal state via relay");
1734 baton = output;
1735 break 'relay_loop;
1736 }
1737
1738 baton = Self::prepare_relay_handoff(&to, &output, prompt);
1740 }
1741 }
1742
1743 relay.shutdown_agents(&ordered_agents);
1744
1745 info!(
1746 story_id = %story_id,
1747 turns,
1748 convergence_hits,
1749 "Relay team completed"
1750 );
1751
1752 Ok(baton)
1753 }
1754
1755 async fn resolve_conflicts_static(
1757 provider: &Arc<dyn Provider>,
1758 model: &str,
1759 working_dir: &Path,
1760 story: &UserStory,
1761 conflicts: &[String],
1762 conflict_diffs: &[(String, String)],
1763 bus: Option<Arc<AgentBus>>,
1764 ) -> anyhow::Result<bool> {
1765 info!(
1766 story_id = %story.id,
1767 num_conflicts = conflicts.len(),
1768 "Starting conflict resolution sub-agent"
1769 );
1770
1771 let conflict_info = conflict_diffs
1773 .iter()
1774 .map(|(file, diff)| format!("### File: {}\n```diff\n{}\n```", file, diff))
1775 .collect::<Vec<_>>()
1776 .join("\n\n");
1777
1778 let prompt = format!(
1779 r#"# CONFLICT RESOLUTION TASK
1780
1781## Story Context: {} - {}
1782{}
1783
1784## Conflicting Files
1785The following files have merge conflicts that need resolution:
1786{}
1787
1788## Conflict Details
1789{}
1790
1791## Your Task
17921. Read each conflicting file to see the conflict markers
17932. Understand what BOTH sides are trying to do:
1794 - HEAD (main branch): the current state
1795 - The incoming branch: the sub-agent's changes for story {}
17963. Resolve each conflict by:
1797 - Keeping BOTH changes if they don't actually conflict
1798 - Merging the logic if they touch the same code
1799 - Preferring the sub-agent's changes if they implement the story requirement
18004. Remove ALL conflict markers (<<<<<<<, =======, >>>>>>>)
18015. Ensure the final code compiles: run `cargo check`
1802
1803## CRITICAL RULES
1804- Do NOT leave any conflict markers in files
1805- Do NOT just pick one side - understand and merge the intent
1806- MUST run `cargo check` after resolving to verify
1807- Stage resolved files with `git add <file>`
1808
1809## Termination
1810SUCCESS: Output `CONFLICTS_RESOLVED` when all files are resolved and compile
1811FAILED: Output `CONFLICTS_UNRESOLVED: <reason>` if you cannot resolve
1812
1813Working directory: {}
1814"#,
1815 story.id,
1816 story.title,
1817 story.description,
1818 conflicts
1819 .iter()
1820 .map(|f| format!("- {}", f))
1821 .collect::<Vec<_>>()
1822 .join("\n"),
1823 conflict_info,
1824 story.id,
1825 working_dir.display()
1826 );
1827
1828 let system_prompt = crate::agent::builtin::build_system_prompt(working_dir);
1830
1831 let tool_registry =
1833 ToolRegistry::with_provider_arc(Arc::clone(provider), model.to_string());
1834
1835 let tool_definitions: Vec<_> = tool_registry
1836 .definitions()
1837 .into_iter()
1838 .filter(|t| t.name != "question")
1839 .collect();
1840
1841 info!(
1842 "Conflict resolver starting with {} tools",
1843 tool_definitions.len()
1844 );
1845
1846 let (output, steps, tool_calls, _exit_reason) = run_agent_loop(
1848 Arc::clone(provider),
1849 model,
1850 &system_prompt,
1851 &prompt,
1852 tool_definitions,
1853 tool_registry,
1854 15, 120, None,
1857 String::new(),
1858 bus.clone(),
1859 Some(working_dir.to_path_buf()),
1860 )
1861 .await?;
1862
1863 info!(
1864 story_id = %story.id,
1865 steps = steps,
1866 tool_calls = tool_calls,
1867 "Conflict resolver completed"
1868 );
1869
1870 let resolved = output.contains("CONFLICTS_RESOLVED")
1872 || (output.contains("resolved") && !output.contains("UNRESOLVED"));
1873
1874 if resolved {
1875 info!(story_id = %story.id, "Conflicts resolved successfully");
1876 } else {
1877 warn!(
1878 story_id = %story.id,
1879 output = %output.chars().take(200).collect::<String>(),
1880 "Conflict resolution may have failed"
1881 );
1882 }
1883
1884 Ok(resolved)
1885 }
1886
1887 fn extract_learnings_static(response: &str) -> Vec<String> {
1889 response
1890 .lines()
1891 .filter(|line| {
1892 line.contains("learned") || line.contains("Learning") || line.contains("# What")
1893 })
1894 .map(|line| line.trim().to_string())
1895 .collect()
1896 }
1897
1898 fn commit_in_dir(&self, dir: &PathBuf, story: &UserStory) -> anyhow::Result<()> {
1900 let _ = Command::new("git")
1902 .args(["add", "-A"])
1903 .current_dir(dir)
1904 .output();
1905
1906 let msg = format!("feat({}): {}", story.id.to_lowercase(), story.title);
1908 let provenance = self.commit_provenance();
1909 let _ = git_commit_with_provenance_blocking(dir, &msg, Some(&provenance));
1910
1911 Ok(())
1912 }
1913
1914 fn find_cargo_root(dir: &Path) -> PathBuf {
1919 fn has_cargo_toml(path: &Path) -> bool {
1920 path.join("Cargo.toml").exists()
1921 }
1922
1923 if has_cargo_toml(dir) {
1925 return dir.to_path_buf();
1926 }
1927
1928 if let Ok(entries) = std::fs::read_dir(dir) {
1930 let mut subdirs: Vec<_> = entries
1931 .filter_map(|e| e.ok())
1932 .filter(|e| e.path().is_dir())
1933 .map(|e| e.path())
1934 .collect();
1935
1936 subdirs.sort();
1938
1939 for subdir in subdirs {
1940 if has_cargo_toml(&subdir) {
1941 return subdir;
1942 }
1943
1944 if let Ok(sub_entries) = std::fs::read_dir(&subdir) {
1946 let mut sub_subdirs: Vec<_> = sub_entries
1947 .filter_map(|e| e.ok())
1948 .filter(|e| e.path().is_dir())
1949 .map(|e| e.path())
1950 .collect();
1951
1952 sub_subdirs.sort();
1953
1954 for sub_subdir in sub_subdirs {
1955 if has_cargo_toml(&sub_subdir) {
1956 return sub_subdir;
1957 }
1958 }
1959 }
1960 }
1961 }
1962
1963 warn!(
1965 dir = %dir.display(),
1966 "No Cargo.toml found, using worktree root for cargo commands"
1967 );
1968 dir.to_path_buf()
1969 }
1970
1971 async fn run_quality_gates_in_dir_with_events(
1973 &self,
1974 dir: &Path,
1975 story_id: &str,
1976 ) -> anyhow::Result<bool> {
1977 let cargo_root = Self::find_cargo_root(dir);
1979 debug!(
1980 worktree_dir = %dir.display(),
1981 cargo_root = %cargo_root.display(),
1982 "Running quality gates"
1983 );
1984
1985 let checks = &self.state.prd.quality_checks;
1986 let mut all_passed = true;
1987
1988 for (name, cmd) in [
1989 ("typecheck", &checks.typecheck),
1990 ("lint", &checks.lint),
1991 ("test", &checks.test),
1992 ("build", &checks.build),
1993 ] {
1994 if let Some(command) = cmd {
1995 debug!("Running {} check in {:?}: {}", name, cargo_root, command);
1996 let output = Command::new("/bin/sh")
1997 .arg("-c")
1998 .arg(command)
1999 .current_dir(&cargo_root)
2000 .output()
2001 .map_err(|e| {
2002 anyhow::anyhow!("Failed to run quality check '{}': {}", name, e)
2003 })?;
2004
2005 let passed = output.status.success();
2006 self.try_send_event(RalphEvent::StoryQualityCheck {
2007 story_id: story_id.to_string(),
2008 check_name: name.to_string(),
2009 passed,
2010 });
2011
2012 if !passed {
2013 let stderr = String::from_utf8_lossy(&output.stderr);
2014 let stdout = String::from_utf8_lossy(&output.stdout);
2015 let combined = format!("{}\n{}", stdout, stderr);
2016 let error_summary: String = combined
2017 .lines()
2018 .filter(|line| {
2019 line.starts_with("error")
2020 || line.contains("error:")
2021 || line.contains("error[")
2022 })
2023 .take(5)
2024 .collect::<Vec<_>>()
2025 .join("\n");
2026 warn!(
2027 check = %name,
2028 cargo_root = %cargo_root.display(),
2029 error_summary = %error_summary.chars().take(300).collect::<String>(),
2030 "{} check failed in {:?}",
2031 name,
2032 cargo_root
2033 );
2034 all_passed = false;
2035 break; }
2037 }
2038 }
2039
2040 Ok(all_passed)
2041 }
2042
2043 fn build_prompt(&self, story: &UserStory) -> String {
2045 let progress = self.load_progress().unwrap_or_default();
2046
2047 format!(
2048 r#"# PRD: {} - {}
2049
2050## Current Story: {} - {}
2051
2052{}
2053
2054### Acceptance Criteria:
2055{}
2056
2057## Previous Progress:
2058{}
2059
2060## Instructions:
20611. Implement the requirements for this story
20622. Write any necessary code changes
20633. Document what you learned
20644. End with `STORY_COMPLETE: {}` when done
2065
2066Respond with the implementation and any shell commands needed.
2067"#,
2068 self.state.prd.project,
2069 self.state.prd.feature,
2070 story.id,
2071 story.title,
2072 story.description,
2073 story
2074 .acceptance_criteria
2075 .iter()
2076 .map(|c| format!("- {}", c))
2077 .collect::<Vec<_>>()
2078 .join("\n"),
2079 if progress.is_empty() {
2080 "None yet".to_string()
2081 } else {
2082 progress
2083 },
2084 story.id
2085 )
2086 }
2087
2088 async fn call_llm(&self, story_id: &str, prompt: &str) -> anyhow::Result<String> {
2090 let system_prompt = crate::agent::builtin::build_system_prompt(&self.state.working_dir);
2092
2093 let tool_registry =
2095 ToolRegistry::with_provider_arc(Arc::clone(&self.provider), self.model.clone());
2096
2097 let tool_definitions: Vec<_> = tool_registry
2099 .definitions()
2100 .into_iter()
2101 .filter(|t| t.name != "question")
2102 .collect();
2103
2104 info!(
2105 "Ralph agent starting with {} tools in {:?}",
2106 tool_definitions.len(),
2107 self.state.working_dir
2108 );
2109
2110 let (bridge_tx, _bridge_handle) = if let Some(ref ralph_tx) = self.event_tx {
2112 let (tx, handle) = Self::create_swarm_event_bridge(ralph_tx, story_id.to_string());
2113 (Some(tx), Some(handle))
2114 } else {
2115 (None, None)
2116 };
2117
2118 let (output, steps, tool_calls, exit_reason) = run_agent_loop(
2120 Arc::clone(&self.provider),
2121 &self.model,
2122 &system_prompt,
2123 prompt,
2124 tool_definitions,
2125 tool_registry, 30, 180, bridge_tx,
2129 story_id.to_string(),
2130 self.bus.clone(),
2131 Some(self.state.working_dir.clone()),
2132 )
2133 .await?;
2134
2135 match exit_reason {
2137 AgentLoopExit::Completed => {
2138 info!(
2139 "Ralph agent completed: {} steps, {} tool calls",
2140 steps, tool_calls
2141 );
2142 }
2143 AgentLoopExit::MaxStepsReached => {
2144 warn!(
2145 story_id = %story_id,
2146 steps = steps,
2147 tool_calls = tool_calls,
2148 "Ralph sub-agent hit max steps limit - work may be incomplete"
2149 );
2150 }
2151 AgentLoopExit::TimedOut => {
2152 warn!(
2153 story_id = %story_id,
2154 steps = steps,
2155 tool_calls = tool_calls,
2156 "Ralph sub-agent timed out - work may be incomplete"
2157 );
2158 }
2159 }
2160
2161 Ok(output)
2162 }
2163
2164 async fn run_quality_gates_with_events(&self, story_id: &str) -> anyhow::Result<bool> {
2166 let cargo_root = Self::find_cargo_root(&self.state.working_dir);
2168 debug!(
2169 working_dir = %self.state.working_dir.display(),
2170 cargo_root = %cargo_root.display(),
2171 "Running quality gates"
2172 );
2173
2174 let checks = &self.state.prd.quality_checks;
2175 let mut all_passed = true;
2176
2177 for (name, cmd) in [
2178 ("typecheck", &checks.typecheck),
2179 ("lint", &checks.lint),
2180 ("test", &checks.test),
2181 ("build", &checks.build),
2182 ] {
2183 if let Some(command) = cmd {
2184 debug!("Running {} check in {:?}: {}", name, cargo_root, command);
2185 let output = Command::new("/bin/sh")
2186 .arg("-c")
2187 .arg(command)
2188 .current_dir(&cargo_root)
2189 .output()
2190 .map_err(|e| {
2191 anyhow::anyhow!("Failed to run quality check '{}': {}", name, e)
2192 })?;
2193
2194 let passed = output.status.success();
2195 self.try_send_event(RalphEvent::StoryQualityCheck {
2196 story_id: story_id.to_string(),
2197 check_name: name.to_string(),
2198 passed,
2199 });
2200
2201 if !passed {
2202 let stderr = String::from_utf8_lossy(&output.stderr);
2203 let stdout = String::from_utf8_lossy(&output.stdout);
2204 let combined = format!("{}\n{}", stdout, stderr);
2205 let error_summary: String = combined
2206 .lines()
2207 .filter(|line| {
2208 line.starts_with("error")
2209 || line.contains("error:")
2210 || line.contains("error[")
2211 })
2212 .take(5)
2213 .collect::<Vec<_>>()
2214 .join("\n");
2215 warn!(
2216 check = %name,
2217 cargo_root = %cargo_root.display(),
2218 error_summary = %error_summary.chars().take(300).collect::<String>(),
2219 "{} check failed in {:?}",
2220 name, cargo_root
2221 );
2222 all_passed = false;
2223 break; }
2225 }
2226 }
2227
2228 Ok(all_passed)
2229 }
2230
2231 fn commit_story(&self, story: &UserStory) -> anyhow::Result<()> {
2233 info!("Committing changes for story: {}", story.id);
2234
2235 let _ = Command::new("git")
2237 .args(["add", "-A"])
2238 .current_dir(&self.state.working_dir)
2239 .output();
2240
2241 let msg = format!("feat({}): {}", story.id.to_lowercase(), story.title);
2243 let provenance = self.commit_provenance();
2244 match git_commit_with_provenance_blocking(&self.state.working_dir, &msg, Some(&provenance))
2245 {
2246 Ok(output) if output.status.success() => {
2247 info!("Committed: {}", msg);
2248 }
2249 Ok(output) => {
2250 warn!(
2251 "Git commit had no changes or failed: {}",
2252 String::from_utf8_lossy(&output.stderr)
2253 );
2254 }
2255 Err(e) => {
2256 warn!("Could not run git commit: {}", e);
2257 }
2258 }
2259
2260 Ok(())
2261 }
2262
2263 fn commit_provenance(&self) -> ExecutionProvenance {
2264 let mut provenance = ExecutionProvenance::for_operation("ralph", ExecutionOrigin::Ralph);
2265 provenance.set_run_id(self.run_id.clone());
2266 provenance
2267 }
2268
2269 fn git_checkout(&self, branch: &str) -> anyhow::Result<()> {
2271 let output = Command::new("git")
2273 .args(["checkout", branch])
2274 .current_dir(&self.state.working_dir)
2275 .output()?;
2276
2277 if !output.status.success() {
2278 Command::new("git")
2279 .args(["checkout", "-b", branch])
2280 .current_dir(&self.state.working_dir)
2281 .output()?;
2282 }
2283
2284 Ok(())
2285 }
2286
2287 fn load_progress(&self) -> anyhow::Result<String> {
2289 let path = self.state.working_dir.join(&self.config.progress_path);
2290 Ok(std::fs::read_to_string(path).unwrap_or_default())
2291 }
2292
2293 fn append_progress(&self, entry: &ProgressEntry, response: &str) -> anyhow::Result<()> {
2295 let path = self.state.working_dir.join(&self.config.progress_path);
2296 let mut content = self.load_progress().unwrap_or_default();
2297
2298 content.push_str(&format!(
2299 "\n---\n\n## Iteration {} - {} ({})\n\n**Status:** {}\n\n### Summary\n{}\n",
2300 entry.iteration, entry.story_id, entry.timestamp, entry.status, response
2301 ));
2302
2303 std::fs::write(path, content)?;
2304 Ok(())
2305 }
2306
2307 fn extract_learnings(&self, response: &str) -> Vec<String> {
2309 let mut learnings = Vec::new();
2310
2311 for line in response.lines() {
2312 if line.contains("learned") || line.contains("Learning") || line.contains("# What") {
2313 learnings.push(line.trim().to_string());
2314 }
2315 }
2316
2317 learnings
2318 }
2319
2320 pub fn status(&self) -> &RalphState {
2322 &self.state
2323 }
2324
2325 pub fn status_markdown(&self) -> String {
2327 let status = if self.state.prd.is_complete() {
2328 "# Ralph Complete!"
2329 } else {
2330 "# Ralph Status"
2331 };
2332
2333 let stories: Vec<String> = self
2334 .state
2335 .prd
2336 .user_stories
2337 .iter()
2338 .map(|s| {
2339 let check = if s.passes { "[x]" } else { "[ ]" };
2340 format!("- {} {}: {}", check, s.id, s.title)
2341 })
2342 .collect();
2343
2344 format!(
2345 "{}\n\n**Project:** {}\n**Feature:** {}\n**Progress:** {}/{} stories\n**Iterations:** {}/{}\n\n## Stories\n{}",
2346 status,
2347 self.state.prd.project,
2348 self.state.prd.feature,
2349 self.state.prd.passed_count(),
2350 self.state.prd.user_stories.len(),
2351 self.state.current_iteration,
2352 self.state.max_iterations,
2353 stories.join("\n")
2354 )
2355 }
2356}
2357
2358pub fn create_prd_template(project: &str, feature: &str) -> Prd {
2360 Prd {
2361 project: project.to_string(),
2362 feature: feature.to_string(),
2363 branch_name: format!("feature/{}", feature.to_lowercase().replace(' ', "-")),
2364 version: "1.0".to_string(),
2365 user_stories: vec![UserStory {
2366 id: "US-001".to_string(),
2367 title: "First user story".to_string(),
2368 description: "Description of what needs to be implemented".to_string(),
2369 acceptance_criteria: vec!["Criterion 1".to_string(), "Criterion 2".to_string()],
2370 verification_steps: Vec::new(),
2371 passes: false,
2372 priority: 1,
2373 depends_on: Vec::new(),
2374 complexity: 3,
2375 }],
2376 technical_requirements: Vec::new(),
2377 quality_checks: QualityChecks {
2378 typecheck: Some("cargo check".to_string()),
2379 test: Some("cargo test".to_string()),
2380 lint: Some("cargo clippy".to_string()),
2381 build: Some("cargo build".to_string()),
2382 },
2383 created_at: chrono::Utc::now().to_rfc3339(),
2384 updated_at: chrono::Utc::now().to_rfc3339(),
2385 }
2386}