1use super::state_store::{RalphRunState, RalphStateStore, StoryResultEntry};
4use super::types::*;
5use super::verification::run_story_verification;
6use crate::bus::AgentBus;
7use crate::bus::relay::{ProtocolRelayRuntime, RelayAgentProfile};
8use crate::provenance::{
9 ExecutionOrigin, ExecutionProvenance, git_commit_with_provenance_blocking,
10};
11use crate::provider::{ContentPart, Message, Provider, ProviderRegistry, Role};
12use crate::session::delegation::DelegationState;
13use crate::session::{Session, SessionEvent};
14use crate::swarm::{executor::AgentLoopExit, run_agent_loop};
15use crate::tool::ToolRegistry;
16use crate::tui::ralph_view::{RalphEvent, RalphStoryInfo, RalphStoryStatus};
17use crate::tui::swarm_view::SwarmEvent;
18use crate::worktree::WorktreeManager;
19use futures::stream::{self, StreamExt};
20use std::collections::HashMap;
21use std::path::{Path, PathBuf};
22use std::process::Command;
23use std::sync::Arc;
24use tokio::sync::mpsc;
25use tracing::{debug, info, warn};
26
27pub struct RalphLoop {
29 state: RalphState,
30 provider: Arc<dyn Provider>,
31 model: String,
32 config: RalphConfig,
33 event_tx: Option<mpsc::Sender<RalphEvent>>,
34 bus: Option<Arc<AgentBus>>,
35 registry: Option<Arc<ProviderRegistry>>,
36 store: Option<Arc<dyn RalphStateStore>>,
37 delegation: Option<DelegationState>,
39 run_id: String,
40}
41
42impl RalphLoop {
43 pub async fn new(
45 prd_path: PathBuf,
46 provider: Arc<dyn Provider>,
47 model: String,
48 config: RalphConfig,
49 ) -> anyhow::Result<Self> {
50 let prd = Prd::load(&prd_path).await?;
51
52 let working_dir = if let Some(parent) = prd_path.parent() {
54 if parent.as_os_str().is_empty() {
55 std::env::current_dir()?
56 } else {
57 parent.to_path_buf()
58 }
59 } else {
60 std::env::current_dir()?
61 };
62
63 info!(
64 "Loaded PRD: {} - {} ({} stories, {} already passed)",
65 prd.project,
66 prd.feature,
67 prd.user_stories.len(),
68 prd.passed_count()
69 );
70
71 let resumed_iterations = prd.passed_count();
73
74 let state = RalphState {
75 prd,
76 current_iteration: resumed_iterations,
77 max_iterations: config.max_iterations,
78 status: RalphStatus::Pending,
79 progress_log: Vec::new(),
80 prd_path: prd_path.clone(),
81 working_dir,
82 };
83
84 Ok(Self {
85 state,
86 provider,
87 model,
88 config,
89 event_tx: None,
90 bus: None,
91 registry: None,
92 store: None,
93 delegation: None,
94 run_id: uuid::Uuid::new_v4().to_string(),
95 })
96 }
97
98 pub fn with_event_tx(mut self, tx: mpsc::Sender<RalphEvent>) -> Self {
100 self.event_tx = Some(tx);
101 self
102 }
103
104 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
106 self.bus = Some(bus);
107 self
108 }
109
110 pub fn with_registry(mut self, registry: Arc<ProviderRegistry>) -> Self {
112 self.registry = Some(registry);
113 self
114 }
115
116 pub fn with_delegation(mut self, state: DelegationState) -> Self {
118 self.delegation = Some(state);
119 self
120 }
121
122 pub fn with_store(mut self, store: Arc<dyn RalphStateStore>) -> Self {
124 self.store = Some(store);
125 self
126 }
127
128 pub fn with_run_id(mut self, run_id: String) -> Self {
130 self.run_id = run_id;
131 self
132 }
133
134 fn store_fire_and_forget(
136 &self,
137 fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
138 ) {
139 tokio::spawn(async move {
140 if let Err(e) = fut.await {
141 warn!(error = %e, "State store update failed");
142 }
143 });
144 }
145
146 fn try_send_event(&self, event: RalphEvent) {
148 if let Some(ref tx) = self.event_tx {
149 let _ = tx.try_send(event);
150 }
151 }
152
153 fn record_story_result(&self, story: &UserStory, passed: bool, error: Option<String>) {
154 if let Some(ref store) = self.store {
155 let s = store.clone();
156 let rid = self.run_id.clone();
157 let entry = StoryResultEntry {
158 story_id: story.id.clone(),
159 title: story.title.clone(),
160 passed,
161 iteration: self.state.current_iteration,
162 error,
163 };
164 self.store_fire_and_forget(async move { s.record_story_result(&rid, &entry).await });
165 }
166 }
167
168 fn bus_publish_story_result(
170 &self,
171 story: &UserStory,
172 iteration: usize,
173 learnings: &[String],
174 next_story_id: Option<&str>,
175 ) {
176 let Some(ref bus) = self.bus else { return };
177 let prd_id = &self.state.prd.project;
178 let handle = bus.handle(format!("ralph.{}", story.id));
179
180 handle.publish_ralph_learning(
182 prd_id,
183 &story.id,
184 iteration,
185 learnings.to_vec(),
186 serde_json::json!({
187 "story_title": story.title,
188 "passed": story.passes,
189 }),
190 );
191
192 if let Some(next_id) = next_story_id {
194 handle.publish_ralph_handoff(
195 prd_id,
196 &story.id,
197 next_id,
198 serde_json::json!({ "learnings": learnings }),
199 &format!(
200 "{} {} (iteration {})",
201 story.id,
202 if story.passes { "passed" } else { "failed" },
203 iteration
204 ),
205 );
206 }
207
208 handle.publish_ralph_progress(
210 prd_id,
211 self.state.prd.passed_count(),
212 self.state.prd.user_stories.len(),
213 iteration,
214 &format!("{:?}", self.state.status),
215 );
216 }
217
218 fn bus_collect_learnings(&self) -> Vec<String> {
220 let Some(ref bus) = self.bus else {
221 return Vec::new();
222 };
223 let prd_id = &self.state.prd.project;
224 let mut handle = bus.handle("ralph-collector");
225 let envelopes = handle.drain_ralph_learnings(prd_id);
226 let mut learnings = Vec::new();
227 for env in envelopes {
228 match env.message {
229 crate::bus::BusMessage::RalphLearning {
230 story_id,
231 iteration,
232 learnings: items,
233 ..
234 } => {
235 for l in items {
236 learnings.push(format!("[{story_id} iter {iteration}] {l}"));
237 }
238 }
239 crate::bus::BusMessage::RalphHandoff {
240 from_story,
241 progress_summary,
242 ..
243 } => {
244 learnings.push(format!("[handoff from {from_story}] {progress_summary}"));
245 }
246 _ => {}
247 }
248 }
249 learnings
250 }
251
252 fn create_swarm_event_bridge(
256 ralph_tx: &mpsc::Sender<RalphEvent>,
257 story_id: String,
258 ) -> (mpsc::Sender<SwarmEvent>, tokio::task::JoinHandle<()>) {
259 let (swarm_tx, mut swarm_rx) = mpsc::channel::<SwarmEvent>(100);
260 let ralph_tx = ralph_tx.clone();
261 let handle = tokio::spawn(async move {
262 while let Some(event) = swarm_rx.recv().await {
263 let ralph_event = match event {
264 SwarmEvent::AgentToolCall { tool_name, .. } => RalphEvent::StoryToolCall {
265 story_id: story_id.clone(),
266 tool_name,
267 },
268 SwarmEvent::AgentToolCallDetail { detail, .. } => {
269 RalphEvent::StoryToolCallDetail {
270 story_id: story_id.clone(),
271 detail,
272 }
273 }
274 SwarmEvent::AgentMessage { entry, .. } => RalphEvent::StoryMessage {
275 story_id: story_id.clone(),
276 entry,
277 },
278 SwarmEvent::AgentOutput { output, .. } => RalphEvent::StoryOutput {
279 story_id: story_id.clone(),
280 output,
281 },
282 SwarmEvent::AgentError { error, .. } => RalphEvent::StoryError {
283 story_id: story_id.clone(),
284 error,
285 },
286 _ => continue, };
288 if ralph_tx.send(ralph_event).await.is_err() {
289 break;
290 }
291 }
292 });
293 (swarm_tx, handle)
294 }
295
296 fn build_story_infos(prd: &Prd) -> Vec<RalphStoryInfo> {
298 prd.user_stories
299 .iter()
300 .map(|s| RalphStoryInfo {
301 id: s.id.clone(),
302 title: s.title.clone(),
303 status: if s.passes {
304 RalphStoryStatus::Passed
305 } else {
306 RalphStoryStatus::Pending
307 },
308 priority: s.priority,
309 depends_on: s.depends_on.clone(),
310 quality_checks: Vec::new(),
311 tool_call_history: Vec::new(),
312 messages: Vec::new(),
313 output: None,
314 error: None,
315 merge_summary: None,
316 steps: 0,
317 current_tool: None,
318 })
319 .collect()
320 }
321
322 pub async fn run(&mut self) -> anyhow::Result<RalphState> {
324 self.state.status = RalphStatus::Running;
325
326 if let Some(ref store) = self.store {
328 let initial_state = RalphRunState {
329 run_id: self.run_id.clone(),
330 okr_id: None,
331 prd: self.state.prd.clone(),
332 config: self.config.clone(),
333 status: RalphStatus::Running,
334 current_iteration: 0,
335 max_iterations: self.state.max_iterations,
336 progress_log: Vec::new(),
337 story_results: Vec::new(),
338 error: None,
339 created_at: chrono::Utc::now().to_rfc3339(),
340 started_at: Some(chrono::Utc::now().to_rfc3339()),
341 completed_at: None,
342 };
343 let s = store.clone();
344 self.store_fire_and_forget(async move { s.create_run(&initial_state).await });
345 }
346
347 self.try_send_event(RalphEvent::Started {
349 project: self.state.prd.project.clone(),
350 feature: self.state.prd.feature.clone(),
351 stories: Self::build_story_infos(&self.state.prd),
352 max_iterations: self.state.max_iterations,
353 });
354
355 if !self.state.prd.branch_name.is_empty() {
357 info!("Switching to branch: {}", self.state.prd.branch_name);
358 self.git_checkout(&self.state.prd.branch_name)?;
359 }
360
361 if self.config.parallel_enabled {
363 self.run_parallel().await?;
364 } else {
365 self.run_sequential().await?;
366 }
367
368 if self.state.status != RalphStatus::Completed {
372 let passed = self.state.prd.passed_count();
373 let total = self.state.prd.user_stories.len();
374
375 if self.state.prd.is_complete() {
376 self.state.status = RalphStatus::Completed;
377 } else if self.state.current_iteration >= self.state.max_iterations {
378 if self.state.current_iteration > 0 && passed == 0 && total > 0 {
379 self.state.status = RalphStatus::QualityFailed;
381 warn!(
382 iterations = self.state.current_iteration,
383 passed = passed,
384 total = total,
385 "Ralph failed: all stories failed quality checks"
386 );
387 } else {
388 self.state.status = RalphStatus::MaxIterations;
390 info!(
391 iterations = self.state.current_iteration,
392 passed = passed,
393 total = total,
394 "Ralph finished with partial progress"
395 );
396 }
397 } else if self.state.current_iteration > 0 {
398 if passed == 0 && total > 0 {
401 self.state.status = RalphStatus::QualityFailed;
402 warn!(
403 iterations = self.state.current_iteration,
404 passed = passed,
405 total = total,
406 "Ralph ended with no passing stories"
407 );
408 } else {
409 self.state.status = RalphStatus::MaxIterations;
410 info!(
411 iterations = self.state.current_iteration,
412 passed = passed,
413 total = total,
414 "Ralph ended before full completion"
415 );
416 }
417 } else {
418 self.state.status = RalphStatus::MaxIterations;
419 }
420 }
421
422 if self.config.worktree_enabled {
424 let mgr = WorktreeManager::new(&self.state.working_dir);
425 match mgr.cleanup_all().await {
426 Ok(count) if count > 0 => {
427 info!(cleaned = count, "Cleaned up orphaned worktrees/branches");
428 }
429 Ok(_) => {}
430 Err(e) => {
431 warn!(error = %e, "Failed to cleanup orphaned worktrees");
432 }
433 }
434 }
435
436 if let Some(ref store) = self.store {
438 let s = store.clone();
439 let rid = self.run_id.clone();
440 let status = self.state.status;
441 let prd = self.state.prd.clone();
442 self.store_fire_and_forget(async move {
443 s.update_prd(&rid, &prd).await?;
444 s.complete_run(&rid, status).await
445 });
446 }
447
448 info!(
449 "Ralph finished: {:?}, {}/{} stories passed",
450 self.state.status,
451 self.state.prd.passed_count(),
452 self.state.prd.user_stories.len()
453 );
454
455 self.try_send_event(RalphEvent::Complete {
457 status: format!("{:?}", self.state.status),
458 passed: self.state.prd.passed_count(),
459 total: self.state.prd.user_stories.len(),
460 });
461
462 Ok(self.state.clone())
463 }
464
465 async fn run_sequential(&mut self) -> anyhow::Result<()> {
467 while self.state.current_iteration < self.state.max_iterations {
468 self.state.current_iteration += 1;
469
470 if let Some(ref store) = self.store {
472 let s = store.clone();
473 let rid = self.run_id.clone();
474 let iter = self.state.current_iteration;
475 self.store_fire_and_forget(async move { s.update_iteration(&rid, iter).await });
476 }
477
478 info!(
479 "=== Ralph iteration {} of {} ===",
480 self.state.current_iteration, self.state.max_iterations
481 );
482
483 self.try_send_event(RalphEvent::IterationStarted {
485 iteration: self.state.current_iteration,
486 max_iterations: self.state.max_iterations,
487 });
488
489 if self.state.prd.is_complete() {
491 info!("All stories complete!");
492 self.state.status = RalphStatus::Completed;
493 break;
494 }
495
496 let story = match self.state.prd.next_story() {
498 Some(s) => s.clone(),
499 None => {
500 warn!("No available stories (dependencies not met)");
501 break;
502 }
503 };
504
505 info!("Working on story: {} - {}", story.id, story.title);
506
507 self.try_send_event(RalphEvent::StoryStarted {
509 story_id: story.id.clone(),
510 });
511
512 let bus_learnings = self.bus_collect_learnings();
514
515 let prompt = if bus_learnings.is_empty() {
517 self.build_prompt(&story)
518 } else {
519 let mut p = self.build_prompt(&story);
520 p.push_str("\n## Learnings from Previous Iterations:\n");
521 for l in &bus_learnings {
522 p.push_str(&format!("- {l}\n"));
523 }
524 p
525 };
526
527 match self.call_llm(&story.id, &prompt).await {
529 Ok(response) => {
530 let entry = ProgressEntry {
532 story_id: story.id.clone(),
533 iteration: self.state.current_iteration,
534 status: "completed".to_string(),
535 learnings: self.extract_learnings(&response),
536 files_changed: Vec::new(),
537 timestamp: chrono::Utc::now().to_rfc3339(),
538 };
539 self.append_progress(&entry, &response)?;
540 let entry_learnings = entry.learnings.clone();
541 self.state.progress_log.push(entry);
542
543 if self.config.quality_checks_enabled {
545 if self.run_quality_gates_with_events(&story.id).await? {
546 info!("Story {} passed quality checks!", story.id);
547 if let Err(error) =
548 run_story_verification(&self.state.working_dir, &story).await
549 {
550 let error = format!("Verification steps failed: {error}");
551 warn!(story_id = %story.id, error = %error);
552 self.record_story_result(&story, false, Some(error.clone()));
553 self.try_send_event(RalphEvent::StoryComplete {
554 story_id: story.id.clone(),
555 passed: false,
556 });
557 continue;
558 }
559 self.state.prd.mark_passed(&story.id);
560
561 self.record_story_result(&story, true, None);
563
564 self.try_send_event(RalphEvent::StoryComplete {
565 story_id: story.id.clone(),
566 passed: true,
567 });
568
569 let next = self.state.prd.next_story().map(|s| s.id.clone());
571 self.bus_publish_story_result(
572 &story,
573 self.state.current_iteration,
574 &entry_learnings,
575 next.as_deref(),
576 );
577
578 if self.config.auto_commit {
580 self.commit_story(&story)?;
581 }
582
583 self.state.prd.save(&self.state.prd_path).await?;
585 } else {
586 warn!("Story {} failed quality checks", story.id);
587
588 self.record_story_result(
590 &story,
591 false,
592 Some("Quality checks failed".to_string()),
593 );
594
595 let next = self.state.prd.next_story().map(|s| s.id.clone());
597 self.bus_publish_story_result(
598 &story,
599 self.state.current_iteration,
600 &entry_learnings,
601 next.as_deref(),
602 );
603
604 self.try_send_event(RalphEvent::StoryComplete {
605 story_id: story.id.clone(),
606 passed: false,
607 });
608 }
609 } else {
610 if let Err(error) =
612 run_story_verification(&self.state.working_dir, &story).await
613 {
614 let error = format!("Verification steps failed: {error}");
615 warn!(story_id = %story.id, error = %error);
616 self.record_story_result(&story, false, Some(error.clone()));
617 self.try_send_event(RalphEvent::StoryComplete {
618 story_id: story.id.clone(),
619 passed: false,
620 });
621 continue;
622 }
623 self.state.prd.mark_passed(&story.id);
624 self.state.prd.save(&self.state.prd_path).await?;
625
626 self.record_story_result(&story, true, None);
628
629 self.try_send_event(RalphEvent::StoryComplete {
630 story_id: story.id.clone(),
631 passed: true,
632 });
633 }
634 }
635 Err(e) => {
636 warn!("LLM call failed: {}", e);
637 let entry = ProgressEntry {
638 story_id: story.id.clone(),
639 iteration: self.state.current_iteration,
640 status: format!("failed: {}", e),
641 learnings: Vec::new(),
642 files_changed: Vec::new(),
643 timestamp: chrono::Utc::now().to_rfc3339(),
644 };
645 self.state.progress_log.push(entry);
646
647 self.try_send_event(RalphEvent::StoryError {
648 story_id: story.id.clone(),
649 error: format!("{}", e),
650 });
651 }
652 }
653 }
654
655 Ok(())
656 }
657
658 async fn run_parallel(&mut self) -> anyhow::Result<()> {
660 let stages: Vec<Vec<UserStory>> = self
662 .state
663 .prd
664 .stages()
665 .into_iter()
666 .map(|stage| stage.into_iter().cloned().collect())
667 .collect();
668 let total_stages = stages.len();
669
670 info!(
671 "Parallel execution: {} stages, {} max concurrent stories",
672 total_stages, self.config.max_concurrent_stories
673 );
674
675 let worktree_mgr = if self.config.worktree_enabled {
677 let mgr = WorktreeManager::new(&self.state.working_dir);
678 info!("Worktree isolation enabled for parallel stories");
679 Some(Arc::new(mgr))
680 } else {
681 None
682 };
683
684 for (stage_idx, stage_stories) in stages.into_iter().enumerate() {
685 if self.state.prd.is_complete() {
686 info!("All stories complete!");
687 self.state.status = RalphStatus::Completed;
688 break;
689 }
690
691 if self.state.current_iteration >= self.state.max_iterations {
692 break;
693 }
694
695 let story_count = stage_stories.len();
696 info!(
697 "=== Stage {}/{}: {} stories in parallel ===",
698 stage_idx + 1,
699 total_stages,
700 story_count
701 );
702
703 let stories: Vec<UserStory> = stage_stories;
705
706 let mut worktrees_by_story: HashMap<String, crate::worktree::WorktreeInfo> =
712 HashMap::new();
713 if let Some(ref mgr) = worktree_mgr {
714 let ids: Vec<String> = stories.iter().map(|s| s.id.clone()).collect();
715 let created: Vec<(String, Option<crate::worktree::WorktreeInfo>)> =
716 stream::iter(ids.into_iter())
717 .map(|story_id| {
718 let mgr = Arc::clone(mgr);
719 async move {
720 let slug = story_id.to_lowercase().replace("-", "_");
721 match mgr.create(&slug).await {
722 Ok(wt) => {
723 if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
724 warn!(
725 story_id = %story_id,
726 error = %e,
727 "Failed to inject workspace stub"
728 );
729 }
730 info!(
731 story_id = %story_id,
732 worktree_path = %wt.path.display(),
733 "Pre-created worktree for story"
734 );
735 (story_id, Some(wt))
736 }
737 Err(e) => {
738 warn!(
739 story_id = %story_id,
740 error = %e,
741 "Failed to pre-create worktree, will use main directory"
742 );
743 (story_id, None)
744 }
745 }
746 }
747 })
748 .buffer_unordered(8)
749 .collect()
750 .await;
751 for (sid, wt) in created {
752 if let Some(wt) = wt {
753 worktrees_by_story.insert(sid, wt);
754 }
755 }
756 }
757
758 let semaphore = Arc::new(tokio::sync::Semaphore::new(
760 self.config.max_concurrent_stories,
761 ));
762 let provider = Arc::clone(&self.provider);
763 let model = self.model.clone();
764 let prd_info = (
765 self.state.prd.project.clone(),
766 self.state.prd.feature.clone(),
767 );
768 let working_dir = self.state.working_dir.clone();
769 let progress_path = self.config.progress_path.clone();
770
771 let mut handles = Vec::new();
772
773 let accumulated_learnings = self.bus_collect_learnings();
775
776 for story in stories {
777 let sem = Arc::clone(&semaphore);
778 let provider = Arc::clone(&provider);
779 let model = model.clone();
780 let prd_info = prd_info.clone();
781 let working_dir = working_dir.clone();
782 let worktree_mgr = worktree_mgr.clone();
783 let prebuilt_worktree = worktrees_by_story.remove(&story.id);
784 let progress_path = progress_path.clone();
785 let ralph_tx = self.event_tx.clone();
786 let stage_learnings = accumulated_learnings.clone();
787 let bus = self.bus.clone();
788 let relay_enabled = self.config.relay_enabled;
789 let relay_max_agents = self.config.relay_max_agents;
790 let relay_max_rounds = self.config.relay_max_rounds;
791 let registry = self.registry.clone();
792 let max_steps_per_story = self.config.max_steps_per_story;
793 let max_quality_retries = self.config.max_quality_retries;
794 let max_rate_limit_retries = self.config.max_rate_limit_retries;
795 let rate_limit_base_delay_ms = self.config.rate_limit_base_delay_ms;
796 let quality_checks_enabled = self.config.quality_checks_enabled;
797 let quality_checks = self.state.prd.quality_checks.clone();
798 let delegation = self.delegation.clone();
799
800 let handle: tokio::task::JoinHandle<(
801 crate::ralph::types::UserStory,
802 bool,
803 crate::ralph::types::ProgressEntry,
804 Option<crate::worktree::WorktreeInfo>,
805 Option<std::sync::Arc<crate::worktree::WorktreeManager>>,
806 bool, Option<String>,
808 )> = tokio::spawn(async move {
809 let _permit = sem.acquire().await.expect("semaphore closed");
810
811 let (story_working_dir, worktree_info) = if let Some(wt) = prebuilt_worktree {
818 (wt.path.clone(), Some(wt))
819 } else {
820 (working_dir.clone(), None)
821 };
822
823 info!(
824 "Working on story: {} - {} (in {:?})",
825 story.id, story.title, story_working_dir
826 );
827
828 if let Some(ref tx) = ralph_tx {
830 let _ = tx
831 .send(RalphEvent::StoryStarted {
832 story_id: story.id.clone(),
833 })
834 .await;
835 }
836
837 let mut prompt =
839 Self::build_story_prompt(&story, &prd_info, &story_working_dir);
840 if !stage_learnings.is_empty() {
841 prompt.push_str("\n## Learnings from Previous Stages:\n");
842 for l in &stage_learnings {
843 prompt.push_str(&format!("- {l}\n"));
844 }
845 }
846
847 let (bridge_tx, _bridge_handle) = if !relay_enabled {
849 if let Some(ref tx) = ralph_tx {
850 let (btx, handle) =
851 Self::create_swarm_event_bridge(tx, story.id.clone());
852 (Some(btx), Some(handle))
853 } else {
854 (None, None)
855 }
856 } else {
857 (None, None)
858 };
859
860 let call_result = {
862 let mut last_err = None;
863 let mut attempt_result = None;
864 for attempt in 0..=max_rate_limit_retries {
865 let res = if relay_enabled {
866 if let Some(ref reg) = registry {
867 Self::call_relay_static(
868 reg,
869 &model,
870 &prompt,
871 &story_working_dir,
872 ralph_tx.clone(),
873 story.id.clone(),
874 bus.clone(),
875 relay_max_agents,
876 relay_max_rounds,
877 delegation.as_ref(),
878 )
879 .await
880 } else {
881 warn!(
882 story_id = %story.id,
883 "Relay enabled but no registry available, using single agent"
884 );
885 Self::call_llm_static(
886 &provider,
887 &model,
888 &prompt,
889 &story_working_dir,
890 bridge_tx.clone(),
891 story.id.clone(),
892 bus.clone(),
893 max_steps_per_story,
894 )
895 .await
896 }
897 } else {
898 Self::call_llm_static(
899 &provider,
900 &model,
901 &prompt,
902 &story_working_dir,
903 bridge_tx.clone(),
904 story.id.clone(),
905 bus.clone(),
906 max_steps_per_story,
907 )
908 .await
909 };
910 match res {
911 Ok(output) => {
912 attempt_result = Some(Ok(output));
913 break;
914 }
915 Err(e) => {
916 let err_str = format!("{e}");
917 let is_rate_limit = err_str.contains("Rate limit")
918 || err_str.contains("rate_limit")
919 || err_str.contains("429")
920 || err_str.contains("too many requests");
921 if is_rate_limit && attempt < max_rate_limit_retries {
922 let delay_ms = rate_limit_base_delay_ms
923 * 2u64.saturating_pow(attempt as u32);
924 warn!(
925 story_id = %story.id,
926 attempt = attempt + 1,
927 max_retries = max_rate_limit_retries,
928 delay_ms = delay_ms,
929 "Rate limited, backing off before retry"
930 );
931 tokio::time::sleep(std::time::Duration::from_millis(
932 delay_ms,
933 ))
934 .await;
935 } else {
936 last_err = Some(e);
937 break;
938 }
939 }
940 }
941 }
942 attempt_result.unwrap_or_else(|| {
943 Err(last_err
944 .unwrap_or_else(|| anyhow::anyhow!("All retries exhausted")))
945 })
946 };
947
948 let (final_result, quality_passed, failure_reason) = match call_result {
951 Ok(initial_output) => {
952 let mut output = initial_output;
953 let mut qg_passed = false;
954 let mut failure_reason = None;
955
956 if quality_checks_enabled {
957 for qg_attempt in 0..=max_quality_retries {
958 let cargo_root = Self::find_cargo_root(&story_working_dir);
959 let checks = &quality_checks;
960 let mut all_ok = true;
961 let mut error_output = String::new();
962
963 for (name, cmd) in [
964 ("typecheck", &checks.typecheck),
965 ("lint", &checks.lint),
966 ("test", &checks.test),
967 ("build", &checks.build),
968 ] {
969 if let Some(command) = cmd {
970 let cmd_output = std::process::Command::new("/bin/sh")
971 .arg("-c")
972 .arg(command)
973 .current_dir(&cargo_root)
974 .output();
975 match cmd_output {
976 Ok(o) if o.status.success() => {}
977 Ok(o) => {
978 let stderr = String::from_utf8_lossy(&o.stderr);
979 let stdout = String::from_utf8_lossy(&o.stdout);
980 let combined = format!("{stdout}\n{stderr}");
981 let errors: String = combined
983 .lines()
984 .filter(|l| {
985 l.starts_with("error")
986 || l.contains("error:")
987 || l.contains("error[")
988 })
989 .take(20)
990 .collect::<Vec<_>>()
991 .join("\n");
992 error_output = format!(
993 "Quality check `{name}` failed:\n{errors}"
994 );
995 all_ok = false;
996 break;
997 }
998 Err(e) => {
999 error_output = format!(
1000 "Quality check `{name}` could not run: {e}"
1001 );
1002 all_ok = false;
1003 break;
1004 }
1005 }
1006 }
1007 }
1008
1009 if all_ok {
1010 match run_story_verification(&story_working_dir, &story)
1011 .await
1012 {
1013 Ok(()) => {
1014 qg_passed = true;
1015 failure_reason = None;
1016 info!(
1017 story_id = %story.id,
1018 attempt = qg_attempt + 1,
1019 "Quality gates and verification passed"
1020 );
1021 break;
1022 }
1023 Err(e) => {
1024 error_output =
1025 format!("Verification steps failed:\n{e}");
1026 failure_reason = Some(error_output.clone());
1027 }
1028 }
1029 }
1030
1031 if qg_attempt < max_quality_retries {
1032 if !error_output.is_empty() {
1033 failure_reason = Some(error_output.clone());
1034 }
1035 warn!(
1037 story_id = %story.id,
1038 attempt = qg_attempt + 1,
1039 max_retries = max_quality_retries,
1040 "Quality gates failed, spawning repair agent"
1041 );
1042
1043 let repair_prompt = format!(
1044 "# QUALITY GATE REPAIR (attempt {}/{})\n\n\
1045 The previous implementation attempt for story {} had quality gate failures.\n\n\
1046 ## Errors:\n```\n{}\n```\n\n\
1047 ## Instructions:\n\
1048 1. Read the files mentioned in the errors\n\
1049 2. Fix ONLY the compile/lint errors shown above\n\
1050 3. Run `cargo check 2>&1` to verify your fix\n\
1051 4. Do NOT refactor or change unrelated code\n\n\
1052 Working directory: {}",
1053 qg_attempt + 1,
1054 max_quality_retries,
1055 story.id,
1056 error_output,
1057 story_working_dir.display()
1058 );
1059
1060 let repair_result = Self::call_llm_static(
1062 &provider,
1063 &model,
1064 &repair_prompt,
1065 &story_working_dir,
1066 bridge_tx.clone(),
1067 story.id.clone(),
1068 bus.clone(),
1069 20, )
1071 .await;
1072
1073 match repair_result {
1074 Ok(repair_output) => {
1075 output = format!(
1076 "{output}\n---\n[repair attempt {}] {repair_output}",
1077 qg_attempt + 1
1078 );
1079 }
1080 Err(e) => {
1081 warn!(
1082 story_id = %story.id,
1083 error = %e,
1084 "Repair agent failed"
1085 );
1086 break;
1087 }
1088 }
1089 } else {
1090 if !error_output.is_empty() {
1091 failure_reason = Some(error_output.clone());
1092 }
1093 warn!(
1094 story_id = %story.id,
1095 "Quality gates still failing after {} retries",
1096 max_quality_retries
1097 );
1098 }
1099 }
1100 } else {
1101 match run_story_verification(&story_working_dir, &story).await {
1102 Ok(()) => {
1103 qg_passed = true;
1104 failure_reason = None;
1105 }
1106 Err(e) => {
1107 failure_reason =
1108 Some(format!("Verification steps failed:\n{e}"));
1109 }
1110 }
1111 }
1112
1113 (Ok(output), qg_passed, failure_reason)
1114 }
1115 Err(e) => (Err(e), false, None),
1116 };
1117
1118 let entry = match &final_result {
1119 Ok(response) => {
1120 let progress_file = story_working_dir.join(&progress_path);
1122 let _ = std::fs::write(&progress_file, response);
1123
1124 ProgressEntry {
1125 story_id: story.id.clone(),
1126 iteration: 1,
1127 status: "completed".to_string(),
1128 learnings: Self::extract_learnings_static(response),
1129 files_changed: Vec::new(),
1130 timestamp: chrono::Utc::now().to_rfc3339(),
1131 }
1132 }
1133 Err(e) => {
1134 warn!("LLM call failed for story {}: {}", story.id, e);
1135 ProgressEntry {
1136 story_id: story.id.clone(),
1137 iteration: 1,
1138 status: format!("failed: {}", e),
1139 learnings: Vec::new(),
1140 files_changed: Vec::new(),
1141 timestamp: chrono::Utc::now().to_rfc3339(),
1142 }
1143 }
1144 };
1145
1146 (
1147 story,
1148 final_result.is_ok(),
1149 entry,
1150 worktree_info,
1151 worktree_mgr,
1152 quality_passed,
1153 failure_reason,
1154 )
1155 });
1156
1157 handles.push(handle);
1158 }
1159
1160 let _stage_passed = 0usize;
1162 for handle in handles {
1163 match handle.await {
1164 Ok((
1165 story,
1166 success,
1167 entry,
1168 worktree_info,
1169 worktree_mgr,
1170 quality_passed,
1171 failure_reason,
1172 )) => {
1173 self.state.progress_log.push(entry);
1174
1175 if success && quality_passed {
1176 {
1179 info!("Story {} passed quality checks!", story.id);
1180
1181 if let Some(ref wt) = worktree_info {
1183 let _ = self.commit_in_dir(&wt.path, &story);
1184 }
1185
1186 if let (Some(wt), Some(mgr)) =
1188 (worktree_info.as_ref(), worktree_mgr.as_ref())
1189 {
1190 match mgr.merge(&wt.name).await {
1191 Ok(merge_result) => {
1192 if merge_result.success {
1193 info!(
1194 story_id = %story.id,
1195 files_changed = merge_result.files_changed,
1196 "Merged story changes successfully"
1197 );
1198 if let Err(error) =
1199 run_story_verification(&working_dir, &story)
1200 .await
1201 {
1202 let error = format!(
1203 "Verification steps failed after merge: {error}"
1204 );
1205 warn!(story_id = %story.id, error = %error);
1206 self.record_story_result(
1207 &story,
1208 false,
1209 Some(error),
1210 );
1211 self.try_send_event(
1212 RalphEvent::StoryComplete {
1213 story_id: story.id.clone(),
1214 passed: false,
1215 },
1216 );
1217 let _ = mgr.cleanup(&wt.name).await;
1218 continue;
1219 }
1220 self.state.prd.mark_passed(&story.id);
1221 self.try_send_event(RalphEvent::StoryMerge {
1222 story_id: story.id.clone(),
1223 success: true,
1224 summary: merge_result.summary.clone(),
1225 });
1226 self.try_send_event(RalphEvent::StoryComplete {
1227 story_id: story.id.clone(),
1228 passed: true,
1229 });
1230
1231 self.record_story_result(&story, true, None);
1233
1234 let _ = mgr.cleanup(&wt.name).await;
1236 } else if !merge_result.conflicts.is_empty() {
1237 info!(
1239 story_id = %story.id,
1240 num_conflicts = merge_result.conflicts.len(),
1241 "Spawning conflict resolver sub-agent"
1242 );
1243
1244 match Self::resolve_conflicts_static(
1246 &provider,
1247 &model,
1248 &working_dir,
1249 &story,
1250 &merge_result.conflicts,
1251 &merge_result.conflict_diffs,
1252 self.bus.clone(),
1253 )
1254 .await
1255 {
1256 Ok(resolved) => {
1257 if resolved {
1258 let commit_msg = format!(
1260 "Merge: resolved conflicts for {}",
1261 story.id
1262 );
1263 match mgr
1264 .complete_merge(
1265 &wt.name,
1266 &commit_msg,
1267 )
1268 .await
1269 {
1270 Ok(final_result) => {
1271 if final_result.success {
1272 info!(
1273 story_id = %story.id,
1274 "Merge completed after conflict resolution"
1275 );
1276 match run_story_verification(
1277 &working_dir,
1278 &story,
1279 )
1280 .await
1281 {
1282 Ok(()) => self
1283 .state
1284 .prd
1285 .mark_passed(
1286 &story.id,
1287 ),
1288 Err(e) => {
1289 let error = format!(
1290 "Verification failed after conflict resolution: {e}"
1291 );
1292 warn!(
1293 story_id = %story.id,
1294 error = %error
1295 );
1296 self.record_story_result(
1297 &story,
1298 false,
1299 Some(error),
1300 );
1301 self.try_send_event(RalphEvent::StoryComplete {
1302 story_id: story.id.clone(),
1303 passed: false,
1304 });
1305 }
1306 }
1307 } else {
1308 warn!(
1309 story_id = %story.id,
1310 "Merge failed even after resolution"
1311 );
1312 let _ = mgr
1313 .abort_merge(&wt.name)
1314 .await;
1315 }
1316 }
1317 Err(e) => {
1318 warn!(
1319 story_id = %story.id,
1320 error = %e,
1321 "Failed to complete merge after resolution"
1322 );
1323 let _ = mgr
1324 .abort_merge(&wt.name)
1325 .await;
1326 }
1327 }
1328 } else {
1329 warn!(
1330 story_id = %story.id,
1331 "Conflict resolver could not resolve all conflicts"
1332 );
1333 let _ = mgr.abort_merge(&wt.name).await;
1334 }
1335 }
1336 Err(e) => {
1337 warn!(
1338 story_id = %story.id,
1339 error = %e,
1340 "Conflict resolver failed"
1341 );
1342 let _ = mgr.abort_merge(&wt.name).await;
1343 }
1344 }
1345 let _ = mgr.cleanup(&wt.name).await;
1347 } else if merge_result.aborted {
1348 warn!(
1350 story_id = %story.id,
1351 summary = %merge_result.summary,
1352 "Merge was aborted due to non-conflict failure"
1353 );
1354 let _ = mgr.cleanup(&wt.name).await;
1356 } else {
1357 warn!(
1359 story_id = %story.id,
1360 summary = %merge_result.summary,
1361 "Merge failed but not aborted - manual intervention may be needed"
1362 );
1363 }
1365 }
1366 Err(e) => {
1367 warn!(
1368 story_id = %story.id,
1369 error = %e,
1370 "Failed to merge worktree"
1371 );
1372 }
1373 }
1374 } else {
1375 if let Err(error) =
1377 run_story_verification(&working_dir, &story).await
1378 {
1379 let error = format!("Verification steps failed: {error}");
1380 warn!(story_id = %story.id, error = %error);
1381 self.record_story_result(&story, false, Some(error));
1382 self.try_send_event(RalphEvent::StoryComplete {
1383 story_id: story.id.clone(),
1384 passed: false,
1385 });
1386 continue;
1387 }
1388 self.state.prd.mark_passed(&story.id);
1389 self.try_send_event(RalphEvent::StoryComplete {
1390 story_id: story.id.clone(),
1391 passed: true,
1392 });
1393
1394 self.record_story_result(&story, true, None);
1396 }
1397 }
1398 } else {
1399 let error = failure_reason.unwrap_or_else(|| {
1401 if success {
1402 "Quality or verification checks failed".to_string()
1403 } else {
1404 "LLM call failed".to_string()
1405 }
1406 });
1407 self.try_send_event(RalphEvent::StoryError {
1408 story_id: story.id.clone(),
1409 error: error.clone(),
1410 });
1411 self.record_story_result(&story, false, Some(error));
1412 if let Some(ref wt) = worktree_info {
1413 info!(
1414 story_id = %story.id,
1415 worktree_path = %wt.path.display(),
1416 "Keeping worktree for debugging (story failed)"
1417 );
1418 }
1419 }
1420 }
1421 Err(e) => {
1422 warn!("Story execution task failed: {}", e);
1423 }
1424 }
1425 }
1426
1427 self.state.current_iteration += 1;
1429
1430 if self.bus.is_some() {
1432 let bus_learnings: Vec<String> = self
1433 .state
1434 .progress_log
1435 .iter()
1436 .flat_map(|e| e.learnings.clone())
1437 .collect();
1438 for story in &self.state.prd.user_stories {
1439 if story.passes {
1440 self.bus_publish_story_result(
1441 story,
1442 self.state.current_iteration,
1443 &bus_learnings,
1444 None,
1445 );
1446 }
1447 }
1448 }
1449
1450 self.state.prd.save(&self.state.prd_path).await?;
1452
1453 if let Some(ref store) = self.store {
1455 let s = store.clone();
1456 let rid = self.run_id.clone();
1457 let prd = self.state.prd.clone();
1458 let iter = self.state.current_iteration;
1459 self.store_fire_and_forget(async move {
1460 s.update_prd(&rid, &prd).await?;
1461 s.update_iteration(&rid, iter).await
1462 });
1463 }
1464 }
1465
1466 Ok(())
1467 }
1468
1469 fn build_story_prompt(
1471 story: &UserStory,
1472 prd_info: &(String, String),
1473 working_dir: &Path,
1474 ) -> String {
1475 let wd = working_dir.display();
1476 format!(
1477 r#"# PRD: {} - {}
1478
1479## Working Directory: {wd}
1480
1481## Current Story: {} - {}
1482
1483{}
1484
1485### Acceptance Criteria:
1486{}
1487
1488### Verification Steps:
1489{}
1490
1491## WORKFLOW (follow this exactly):
1492
14931. **EXPLORE** (2-4 tool calls): Use `glob` and `read` to understand existing code
14942. **IMPLEMENT** (5-15 tool calls): Use `write` or `edit` to make changes
14953. **VERIFY**: Run a verification command appropriate to the language (see below)
14964. **FIX OR FINISH**:
1497 - If no errors: Output `STORY_COMPLETE: {}` and STOP
1498 - If errors: Parse the error, fix it, re-verify (max 3 fix attempts)
1499 - After 3 failed attempts: Output `STORY_BLOCKED: <error summary>` and STOP
1500
1501## VERIFICATION BY LANGUAGE:
1502- **Rust**: `bash` with `cargo check 2>&1`
1503- **Python**: `bash` with `python -c "import ast; ast.parse(open('FILE').read())"` for each changed file
1504- **TypeScript/JavaScript**: `bash` with `npx tsc --noEmit` (if tsconfig.json exists)
1505- Choose the right check for the files you modified
1506
1507## TOOL USAGE:
1508- `read`: Read file content (always read before editing!)
1509- `edit`: Modify files (MUST include 3+ lines before/after for unique context)
1510- `write`: Create new files
1511- `bash`: Run commands with `{{"command": "...", "cwd": "{wd}"}}`
1512
1513## CRITICAL RULES:
1514- ALWAYS read a file before editing it
1515- When edit fails with "ambiguous match", include MORE context lines
1516- Do NOT add TODO/placeholder comments
1517- Count your fix attempts - STOP after 3 failures
1518
1519## TERMINATION:
1520SUCCESS: Output `STORY_COMPLETE: {}`
1521BLOCKED: Output `STORY_BLOCKED: <brief error description>`
1522
1523Do NOT keep iterating indefinitely. Stop when done or blocked.
1524"#,
1525 prd_info.0,
1526 prd_info.1,
1527 story.id,
1528 story.title,
1529 story.description,
1530 story
1531 .acceptance_criteria
1532 .iter()
1533 .map(|c| format!("- {}", c))
1534 .collect::<Vec<_>>()
1535 .join("\n"),
1536 Self::format_verification_steps(story),
1537 story.id,
1538 story.id
1539 )
1540 }
1541
1542 fn format_verification_steps(story: &UserStory) -> String {
1543 if story.verification_steps.is_empty() {
1544 "None".to_string()
1545 } else {
1546 serde_json::to_string_pretty(&story.verification_steps)
1547 .unwrap_or_else(|_| "Unable to render verification steps".to_string())
1548 }
1549 }
1550
1551 async fn call_llm_static(
1553 provider: &Arc<dyn Provider>,
1554 model: &str,
1555 prompt: &str,
1556 working_dir: &PathBuf,
1557 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1558 story_id: String,
1559 bus: Option<Arc<AgentBus>>,
1560 max_steps_per_story: usize,
1561 ) -> anyhow::Result<String> {
1562 let system_prompt = crate::agent::builtin::build_system_prompt(working_dir);
1564
1565 let tool_registry =
1567 ToolRegistry::with_provider_arc(Arc::clone(provider), model.to_string());
1568
1569 let tool_definitions: Vec<_> = tool_registry
1571 .definitions()
1572 .into_iter()
1573 .filter(|t| t.name != "question")
1574 .collect();
1575
1576 info!(
1577 "Ralph sub-agent starting with {} tools in {:?}",
1578 tool_definitions.len(),
1579 working_dir
1580 );
1581
1582 let max_steps = max_steps_per_story;
1584 let (output, steps, tool_calls, _exit_reason) = run_agent_loop(
1585 Arc::clone(provider),
1586 model,
1587 &system_prompt,
1588 prompt,
1589 tool_definitions,
1590 tool_registry, max_steps,
1592 180, event_tx,
1594 story_id,
1595 bus.clone(),
1596 Some(working_dir.clone()),
1597 )
1598 .await?;
1599
1600 info!(
1601 "Ralph sub-agent completed: {} steps, {} tool calls",
1602 steps, tool_calls
1603 );
1604
1605 Ok(output)
1606 }
1607
1608 fn build_implementation_relay_profiles(
1610 max_agents: usize,
1611 working_dir: &std::path::Path,
1612 ) -> Vec<(String, String, Vec<String>)> {
1613 let wd = working_dir.display();
1614 let mut profiles = vec![
1615 (
1616 "auto-planner".to_string(),
1617 format!(
1618 "You are @auto-planner.\n\
1619 Specialty: Code analysis and implementation planning.\n\
1620 Mission: Read the codebase to understand existing patterns, then produce a concrete step-by-step implementation plan.\n\
1621 Working directory: {wd}\n\n\
1622 Read existing code first using the `read` tool. Identify all files that need changes.\n\
1623 Produce a numbered list of specific changes with file paths and code snippets.\n\
1624 Pass your plan in a clear format the next agent can follow step by step."
1625 ),
1626 vec!["planning".into(), "analysis".into(), "relay".into()],
1627 ),
1628 (
1629 "auto-coder".to_string(),
1630 format!(
1631 "You are @auto-coder.\n\
1632 Specialty: Code implementation.\n\
1633 Mission: Implement changes according to the plan, write production code, verify it compiles.\n\
1634 Working directory: {wd}\n\n\
1635 Follow the plan from the incoming handoff. Use `edit` and `write` tools to make changes.\n\
1636 After implementing, run `bash` with `cargo check 2>&1` (cwd: {wd}) to verify.\n\
1637 Fix any compilation errors. Report what you implemented."
1638 ),
1639 vec!["implementation".into(), "coding".into(), "relay".into()],
1640 ),
1641 (
1642 "auto-reviewer".to_string(),
1643 format!(
1644 "You are @auto-reviewer.\n\
1645 Specialty: Code review and quality verification.\n\
1646 Mission: Review implemented changes, run quality checks, fix remaining issues.\n\
1647 Working directory: {wd}\n\n\
1648 Review the code changes from the incoming handoff. Run `bash` with `cargo check 2>&1` (cwd: {wd}).\n\
1649 Fix any remaining issues. Verify the implementation meets the acceptance criteria.\n\
1650 If complete, include STORY_COMPLETE in your output. If blocked, include STORY_BLOCKED."
1651 ),
1652 vec!["review".into(), "verification".into(), "relay".into()],
1653 ),
1654 ];
1655
1656 if max_agents >= 4 {
1657 profiles.push((
1658 "auto-tester".to_string(),
1659 format!(
1660 "You are @auto-tester.\n\
1661 Specialty: Test writing and verification.\n\
1662 Mission: Write tests for the implemented changes and ensure they pass.\n\
1663 Working directory: {wd}\n\n\
1664 Review the implementation, write appropriate tests, run them with `bash`.\n\
1665 Report test results and coverage gaps."
1666 ),
1667 vec!["testing".into(), "relay".into()],
1668 ));
1669 }
1670
1671 if max_agents >= 5 {
1672 profiles.push((
1673 "auto-refactorer".to_string(),
1674 format!(
1675 "You are @auto-refactorer.\n\
1676 Specialty: Code quality and lint compliance.\n\
1677 Mission: Run clippy, fix warnings, improve code quality without changing behavior.\n\
1678 Working directory: {wd}\n\n\
1679 Run `bash` with `cargo clippy --all-features 2>&1` (cwd: {wd}).\n\
1680 Fix warnings and improve naming, structure, error handling."
1681 ),
1682 vec!["quality".into(), "relay".into()],
1683 ));
1684 }
1685
1686 profiles.truncate(max_agents);
1687 profiles
1688 }
1689
1690 fn normalize_for_relay(text: &str) -> String {
1692 let mut normalized = String::with_capacity(text.len().min(512));
1693 let mut last_was_space = false;
1694 for ch in text.chars() {
1695 if ch.is_ascii_alphanumeric() {
1696 normalized.push(ch.to_ascii_lowercase());
1697 last_was_space = false;
1698 } else if ch.is_whitespace() && !last_was_space {
1699 normalized.push(' ');
1700 last_was_space = true;
1701 }
1702 if normalized.len() >= 280 {
1703 break;
1704 }
1705 }
1706 normalized.trim().to_string()
1707 }
1708
1709 fn prepare_relay_handoff(from_agent: &str, output: &str, original_task: &str) -> String {
1711 let max_len = 6_000;
1712 let relay_payload = if output.len() > max_len {
1713 let truncated: String = output.chars().take(max_len).collect();
1714 format!("{truncated}\n... (truncated)")
1715 } else {
1716 output.to_string()
1717 };
1718
1719 format!(
1720 "Original task:\n{original_task}\n\n\
1721 Incoming handoff from @{from_agent}:\n{relay_payload}\n\n\
1722 Continue the work from this handoff. Keep your response focused and provide concrete next steps."
1723 )
1724 }
1725
1726 async fn call_relay_static(
1733 registry: &Arc<ProviderRegistry>,
1734 model: &str,
1735 prompt: &str,
1736 working_dir: &Path,
1737 ralph_tx: Option<mpsc::Sender<RalphEvent>>,
1738 story_id: String,
1739 bus: Option<Arc<AgentBus>>,
1740 max_agents: usize,
1741 max_rounds: usize,
1742 delegation: Option<&DelegationState>,
1743 ) -> anyhow::Result<String> {
1744 let max_agents = max_agents.clamp(2, 8);
1745 let max_rounds = max_rounds.clamp(1, 5);
1746
1747 let profiles = Self::build_implementation_relay_profiles(max_agents, working_dir);
1748
1749 let relay_bus = bus.unwrap_or_else(|| Arc::new(AgentBus::new()));
1751 let relay = ProtocolRelayRuntime::new(relay_bus.clone());
1752
1753 let mut sessions: HashMap<String, Session> = HashMap::new();
1754 let mut ordered_agents: Vec<String> = Vec::new();
1755 let mut relay_profiles: Vec<RelayAgentProfile> = Vec::new();
1756
1757 for (name, instructions, capabilities) in &profiles {
1758 let default_provider = registry.list().first().copied().unwrap_or("openai");
1760 let (_role_provider, role_model) = match delegation {
1761 Some(state) => super::delegation::choose_provider_for_role(
1762 registry,
1763 state,
1764 name,
1765 default_provider,
1766 model,
1767 ),
1768 None => (default_provider.to_string(), model.to_string()),
1769 };
1770 let role_model_str = role_model.clone();
1771
1772 let mut session = Session::new().await?;
1773 session.metadata.model = Some(role_model_str);
1774 session.set_agent_name(name.clone());
1775 session.bus = Some(relay_bus.clone());
1776 session.add_message(Message {
1777 role: Role::System,
1778 content: vec![ContentPart::Text {
1779 text: instructions.clone(),
1780 }],
1781 });
1782
1783 relay_profiles.push(RelayAgentProfile {
1784 name: name.clone(),
1785 capabilities: capabilities.clone(),
1786 });
1787 ordered_agents.push(name.clone());
1788 sessions.insert(name.clone(), session);
1789 }
1790
1791 relay.register_agents(&relay_profiles);
1792
1793 info!(
1794 story_id = %story_id,
1795 agents = ordered_agents.len(),
1796 rounds = max_rounds,
1797 "Starting relay team for story"
1798 );
1799
1800 let mut baton = prompt.to_string();
1801 let mut previous_normalized: Option<String> = None;
1802 let mut convergence_hits = 0usize;
1803 let mut turns = 0usize;
1804
1805 'relay_loop: for round in 1..=max_rounds {
1806 for idx in 0..ordered_agents.len() {
1807 let to = ordered_agents[idx].clone();
1808 let from = if idx == 0 {
1809 if round == 1 {
1810 "user".to_string()
1811 } else {
1812 ordered_agents[ordered_agents.len() - 1].clone()
1813 }
1814 } else {
1815 ordered_agents[idx - 1].clone()
1816 };
1817
1818 turns += 1;
1819 relay.send_handoff(&from, &to, &baton);
1820
1821 if let Some(ref tx) = ralph_tx {
1822 let _ = tx
1823 .send(RalphEvent::StoryToolCall {
1824 story_id: story_id.clone(),
1825 tool_name: format!(
1826 "relay: @{from} → @{to} (round {round}/{max_rounds})"
1827 ),
1828 })
1829 .await;
1830 }
1831
1832 let Some(mut session) = sessions.remove(&to) else {
1833 anyhow::bail!("Relay agent @{to} session unavailable for story {story_id}");
1834 };
1835
1836 let (event_tx, mut event_rx) = mpsc::channel::<SessionEvent>(256);
1837 let registry_clone = registry.clone();
1838 let baton_clone = baton.clone();
1839
1840 let join = tokio::spawn(async move {
1841 let result = session
1842 .prompt_with_events(&baton_clone, event_tx, registry_clone)
1843 .await;
1844 (session, result)
1845 });
1846
1847 while !join.is_finished() {
1849 while let Ok(event) = event_rx.try_recv() {
1850 if let Some(ref tx) = ralph_tx
1851 && let SessionEvent::ToolCallStart { ref name, .. } = event
1852 {
1853 let _ = tx
1854 .send(RalphEvent::StoryToolCall {
1855 story_id: story_id.clone(),
1856 tool_name: format!("@{to}: {name}"),
1857 })
1858 .await;
1859 }
1860 }
1861 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1862 }
1863
1864 let (updated_session, result) = join
1865 .await
1866 .map_err(|e| anyhow::anyhow!("Relay agent @{to} task error: {e}"))?;
1867
1868 while event_rx.try_recv().is_ok() {}
1870
1871 sessions.insert(to.clone(), updated_session);
1872
1873 let output = result
1874 .map_err(|e| {
1875 anyhow::anyhow!("Relay agent @{to} failed on story {story_id}: {e}")
1876 })?
1877 .text;
1878
1879 let normalized = Self::normalize_for_relay(&output);
1881 if previous_normalized.as_deref() == Some(normalized.as_str()) {
1882 convergence_hits += 1;
1883 } else {
1884 convergence_hits = 0;
1885 }
1886 previous_normalized = Some(normalized);
1887
1888 if convergence_hits >= 2 {
1889 info!(story_id = %story_id, turns, "Relay converged");
1890 baton = output;
1891 break 'relay_loop;
1892 }
1893
1894 if output.contains("STORY_COMPLETE") || output.contains("STORY_BLOCKED") {
1896 info!(story_id = %story_id, turns, "Story reached terminal state via relay");
1897 baton = output;
1898 break 'relay_loop;
1899 }
1900
1901 baton = Self::prepare_relay_handoff(&to, &output, prompt);
1903 }
1904 }
1905
1906 relay.shutdown_agents(&ordered_agents);
1907
1908 info!(
1909 story_id = %story_id,
1910 turns,
1911 convergence_hits,
1912 "Relay team completed"
1913 );
1914
1915 Ok(baton)
1916 }
1917
1918 async fn resolve_conflicts_static(
1920 provider: &Arc<dyn Provider>,
1921 model: &str,
1922 working_dir: &Path,
1923 story: &UserStory,
1924 conflicts: &[String],
1925 conflict_diffs: &[(String, String)],
1926 bus: Option<Arc<AgentBus>>,
1927 ) -> anyhow::Result<bool> {
1928 info!(
1929 story_id = %story.id,
1930 num_conflicts = conflicts.len(),
1931 "Starting conflict resolution sub-agent"
1932 );
1933
1934 let conflict_info = conflict_diffs
1936 .iter()
1937 .map(|(file, diff)| format!("### File: {}\n```diff\n{}\n```", file, diff))
1938 .collect::<Vec<_>>()
1939 .join("\n\n");
1940
1941 let prompt = format!(
1942 r#"# CONFLICT RESOLUTION TASK
1943
1944## Story Context: {} - {}
1945{}
1946
1947## Conflicting Files
1948The following files have merge conflicts that need resolution:
1949{}
1950
1951## Conflict Details
1952{}
1953
1954## Your Task
19551. Read each conflicting file to see the conflict markers
19562. Understand what BOTH sides are trying to do:
1957 - HEAD (main branch): the current state
1958 - The incoming branch: the sub-agent's changes for story {}
19593. Resolve each conflict by:
1960 - Keeping BOTH changes if they don't actually conflict
1961 - Merging the logic if they touch the same code
1962 - Preferring the sub-agent's changes if they implement the story requirement
19634. Remove ALL conflict markers (<<<<<<<, =======, >>>>>>>)
19645. Ensure the final code compiles: run `cargo check`
1965
1966## CRITICAL RULES
1967- Do NOT leave any conflict markers in files
1968- Do NOT just pick one side - understand and merge the intent
1969- MUST run `cargo check` after resolving to verify
1970- Stage resolved files with `git add <file>`
1971
1972## Termination
1973SUCCESS: Output `CONFLICTS_RESOLVED` when all files are resolved and compile
1974FAILED: Output `CONFLICTS_UNRESOLVED: <reason>` if you cannot resolve
1975
1976Working directory: {}
1977"#,
1978 story.id,
1979 story.title,
1980 story.description,
1981 conflicts
1982 .iter()
1983 .map(|f| format!("- {}", f))
1984 .collect::<Vec<_>>()
1985 .join("\n"),
1986 conflict_info,
1987 story.id,
1988 working_dir.display()
1989 );
1990
1991 let system_prompt = crate::agent::builtin::build_system_prompt(working_dir);
1993
1994 let tool_registry =
1996 ToolRegistry::with_provider_arc(Arc::clone(provider), model.to_string());
1997
1998 let tool_definitions: Vec<_> = tool_registry
1999 .definitions()
2000 .into_iter()
2001 .filter(|t| t.name != "question")
2002 .collect();
2003
2004 info!(
2005 "Conflict resolver starting with {} tools",
2006 tool_definitions.len()
2007 );
2008
2009 let (output, steps, tool_calls, _exit_reason) = run_agent_loop(
2011 Arc::clone(provider),
2012 model,
2013 &system_prompt,
2014 &prompt,
2015 tool_definitions,
2016 tool_registry,
2017 15, 120, None,
2020 String::new(),
2021 bus.clone(),
2022 Some(working_dir.to_path_buf()),
2023 )
2024 .await?;
2025
2026 info!(
2027 story_id = %story.id,
2028 steps = steps,
2029 tool_calls = tool_calls,
2030 "Conflict resolver completed"
2031 );
2032
2033 let resolved = output.contains("CONFLICTS_RESOLVED")
2035 || (output.contains("resolved") && !output.contains("UNRESOLVED"));
2036
2037 if resolved {
2038 info!(story_id = %story.id, "Conflicts resolved successfully");
2039 } else {
2040 warn!(
2041 story_id = %story.id,
2042 output = %output.chars().take(200).collect::<String>(),
2043 "Conflict resolution may have failed"
2044 );
2045 }
2046
2047 Ok(resolved)
2048 }
2049
2050 fn extract_learnings_static(response: &str) -> Vec<String> {
2052 response
2053 .lines()
2054 .filter(|line| {
2055 line.contains("learned") || line.contains("Learning") || line.contains("# What")
2056 })
2057 .map(|line| line.trim().to_string())
2058 .collect()
2059 }
2060
2061 fn commit_in_dir(&self, dir: &PathBuf, story: &UserStory) -> anyhow::Result<()> {
2063 let _ = Command::new("git")
2065 .args(["add", "-A"])
2066 .current_dir(dir)
2067 .output();
2068
2069 let msg = format!("feat({}): {}", story.id.to_lowercase(), story.title);
2071 let provenance = self.commit_provenance();
2072 let _ = git_commit_with_provenance_blocking(dir, &msg, Some(&provenance));
2073
2074 Ok(())
2075 }
2076
2077 fn find_cargo_root(dir: &Path) -> PathBuf {
2082 fn has_cargo_toml(path: &Path) -> bool {
2083 path.join("Cargo.toml").exists()
2084 }
2085
2086 if has_cargo_toml(dir) {
2088 return dir.to_path_buf();
2089 }
2090
2091 if let Ok(entries) = std::fs::read_dir(dir) {
2093 let mut subdirs: Vec<_> = entries
2094 .filter_map(|e| e.ok())
2095 .filter(|e| e.path().is_dir())
2096 .map(|e| e.path())
2097 .collect();
2098
2099 subdirs.sort();
2101
2102 for subdir in subdirs {
2103 if has_cargo_toml(&subdir) {
2104 return subdir;
2105 }
2106
2107 if let Ok(sub_entries) = std::fs::read_dir(&subdir) {
2109 let mut sub_subdirs: Vec<_> = sub_entries
2110 .filter_map(|e| e.ok())
2111 .filter(|e| e.path().is_dir())
2112 .map(|e| e.path())
2113 .collect();
2114
2115 sub_subdirs.sort();
2116
2117 for sub_subdir in sub_subdirs {
2118 if has_cargo_toml(&sub_subdir) {
2119 return sub_subdir;
2120 }
2121 }
2122 }
2123 }
2124 }
2125
2126 warn!(
2128 dir = %dir.display(),
2129 "No Cargo.toml found, using worktree root for cargo commands"
2130 );
2131 dir.to_path_buf()
2132 }
2133
2134 async fn run_quality_gates_in_dir_with_events(
2136 &self,
2137 dir: &Path,
2138 story_id: &str,
2139 ) -> anyhow::Result<bool> {
2140 let cargo_root = Self::find_cargo_root(dir);
2142 debug!(
2143 worktree_dir = %dir.display(),
2144 cargo_root = %cargo_root.display(),
2145 "Running quality gates"
2146 );
2147
2148 let checks = &self.state.prd.quality_checks;
2149 let mut all_passed = true;
2150
2151 for (name, cmd) in [
2152 ("typecheck", &checks.typecheck),
2153 ("lint", &checks.lint),
2154 ("test", &checks.test),
2155 ("build", &checks.build),
2156 ] {
2157 if let Some(command) = cmd {
2158 debug!("Running {} check in {:?}: {}", name, cargo_root, command);
2159 let output = Command::new("/bin/sh")
2160 .arg("-c")
2161 .arg(command)
2162 .current_dir(&cargo_root)
2163 .output()
2164 .map_err(|e| {
2165 anyhow::anyhow!("Failed to run quality check '{}': {}", name, e)
2166 })?;
2167
2168 let passed = output.status.success();
2169 self.try_send_event(RalphEvent::StoryQualityCheck {
2170 story_id: story_id.to_string(),
2171 check_name: name.to_string(),
2172 passed,
2173 });
2174
2175 if !passed {
2176 let stderr = String::from_utf8_lossy(&output.stderr);
2177 let stdout = String::from_utf8_lossy(&output.stdout);
2178 let combined = format!("{}\n{}", stdout, stderr);
2179 let error_summary: String = combined
2180 .lines()
2181 .filter(|line| {
2182 line.starts_with("error")
2183 || line.contains("error:")
2184 || line.contains("error[")
2185 })
2186 .take(5)
2187 .collect::<Vec<_>>()
2188 .join("\n");
2189 warn!(
2190 check = %name,
2191 cargo_root = %cargo_root.display(),
2192 error_summary = %error_summary.chars().take(300).collect::<String>(),
2193 "{} check failed in {:?}",
2194 name,
2195 cargo_root
2196 );
2197 all_passed = false;
2198 break; }
2200 }
2201 }
2202
2203 Ok(all_passed)
2204 }
2205
2206 fn build_prompt(&self, story: &UserStory) -> String {
2208 let progress = self.load_progress().unwrap_or_default();
2209
2210 format!(
2211 r#"# PRD: {} - {}
2212
2213## Current Story: {} - {}
2214
2215{}
2216
2217### Acceptance Criteria:
2218{}
2219
2220### Verification Steps:
2221{}
2222
2223## Previous Progress:
2224{}
2225
2226## Instructions:
22271. Implement the requirements for this story
22282. Write any necessary code changes
22293. Document what you learned
22304. End with `STORY_COMPLETE: {}` when done
2231
2232Respond with the implementation and any shell commands needed.
2233"#,
2234 self.state.prd.project,
2235 self.state.prd.feature,
2236 story.id,
2237 story.title,
2238 story.description,
2239 story
2240 .acceptance_criteria
2241 .iter()
2242 .map(|c| format!("- {}", c))
2243 .collect::<Vec<_>>()
2244 .join("\n"),
2245 Self::format_verification_steps(story),
2246 if progress.is_empty() {
2247 "None yet".to_string()
2248 } else {
2249 progress
2250 },
2251 story.id
2252 )
2253 }
2254
2255 async fn call_llm(&self, story_id: &str, prompt: &str) -> anyhow::Result<String> {
2257 let system_prompt = crate::agent::builtin::build_system_prompt(&self.state.working_dir);
2259
2260 let tool_registry =
2262 ToolRegistry::with_provider_arc(Arc::clone(&self.provider), self.model.clone());
2263
2264 let tool_definitions: Vec<_> = tool_registry
2266 .definitions()
2267 .into_iter()
2268 .filter(|t| t.name != "question")
2269 .collect();
2270
2271 info!(
2272 "Ralph agent starting with {} tools in {:?}",
2273 tool_definitions.len(),
2274 self.state.working_dir
2275 );
2276
2277 let (bridge_tx, _bridge_handle) = if let Some(ref ralph_tx) = self.event_tx {
2279 let (tx, handle) = Self::create_swarm_event_bridge(ralph_tx, story_id.to_string());
2280 (Some(tx), Some(handle))
2281 } else {
2282 (None, None)
2283 };
2284
2285 let (output, steps, tool_calls, exit_reason) = run_agent_loop(
2287 Arc::clone(&self.provider),
2288 &self.model,
2289 &system_prompt,
2290 prompt,
2291 tool_definitions,
2292 tool_registry, 30, 180, bridge_tx,
2296 story_id.to_string(),
2297 self.bus.clone(),
2298 Some(self.state.working_dir.clone()),
2299 )
2300 .await?;
2301
2302 match exit_reason {
2304 AgentLoopExit::Completed => {
2305 info!(
2306 "Ralph agent completed: {} steps, {} tool calls",
2307 steps, tool_calls
2308 );
2309 }
2310 AgentLoopExit::MaxStepsReached => {
2311 warn!(
2312 story_id = %story_id,
2313 steps = steps,
2314 tool_calls = tool_calls,
2315 "Ralph sub-agent hit max steps limit - work may be incomplete"
2316 );
2317 }
2318 AgentLoopExit::TimedOut => {
2319 warn!(
2320 story_id = %story_id,
2321 steps = steps,
2322 tool_calls = tool_calls,
2323 "Ralph sub-agent timed out - work may be incomplete"
2324 );
2325 }
2326 }
2327
2328 Ok(output)
2329 }
2330
2331 async fn run_quality_gates_with_events(&self, story_id: &str) -> anyhow::Result<bool> {
2333 self.run_quality_gates_in_dir_with_events(&self.state.working_dir, story_id)
2334 .await
2335 }
2336
2337 fn commit_story(&self, story: &UserStory) -> anyhow::Result<()> {
2339 info!("Committing changes for story: {}", story.id);
2340
2341 let _ = Command::new("git")
2343 .args(["add", "-A"])
2344 .current_dir(&self.state.working_dir)
2345 .output();
2346
2347 let msg = format!("feat({}): {}", story.id.to_lowercase(), story.title);
2349 let provenance = self.commit_provenance();
2350 match git_commit_with_provenance_blocking(&self.state.working_dir, &msg, Some(&provenance))
2351 {
2352 Ok(output) if output.status.success() => {
2353 info!("Committed: {}", msg);
2354 }
2355 Ok(output) => {
2356 warn!(
2357 "Git commit had no changes or failed: {}",
2358 String::from_utf8_lossy(&output.stderr)
2359 );
2360 }
2361 Err(e) => {
2362 warn!("Could not run git commit: {}", e);
2363 }
2364 }
2365
2366 Ok(())
2367 }
2368
2369 fn commit_provenance(&self) -> ExecutionProvenance {
2370 let mut provenance = ExecutionProvenance::for_operation("ralph", ExecutionOrigin::Ralph);
2371 provenance.set_run_id(self.run_id.clone());
2372 provenance
2373 }
2374
2375 fn git_checkout(&self, branch: &str) -> anyhow::Result<()> {
2377 let output = Command::new("git")
2379 .args(["checkout", branch])
2380 .current_dir(&self.state.working_dir)
2381 .output()?;
2382
2383 if !output.status.success() {
2384 Command::new("git")
2385 .args(["checkout", "-b", branch])
2386 .current_dir(&self.state.working_dir)
2387 .output()?;
2388 }
2389
2390 Ok(())
2391 }
2392
2393 fn load_progress(&self) -> anyhow::Result<String> {
2395 let path = self.state.working_dir.join(&self.config.progress_path);
2396 Ok(std::fs::read_to_string(path).unwrap_or_default())
2397 }
2398
2399 fn append_progress(&self, entry: &ProgressEntry, response: &str) -> anyhow::Result<()> {
2401 let path = self.state.working_dir.join(&self.config.progress_path);
2402 let mut content = self.load_progress().unwrap_or_default();
2403
2404 content.push_str(&format!(
2405 "\n---\n\n## Iteration {} - {} ({})\n\n**Status:** {}\n\n### Summary\n{}\n",
2406 entry.iteration, entry.story_id, entry.timestamp, entry.status, response
2407 ));
2408
2409 std::fs::write(path, content)?;
2410 Ok(())
2411 }
2412
2413 fn extract_learnings(&self, response: &str) -> Vec<String> {
2415 let mut learnings = Vec::new();
2416
2417 for line in response.lines() {
2418 if line.contains("learned") || line.contains("Learning") || line.contains("# What") {
2419 learnings.push(line.trim().to_string());
2420 }
2421 }
2422
2423 learnings
2424 }
2425
2426 pub fn status(&self) -> &RalphState {
2428 &self.state
2429 }
2430
2431 pub fn status_markdown(&self) -> String {
2433 let status = if self.state.prd.is_complete() {
2434 "# Ralph Complete!"
2435 } else {
2436 "# Ralph Status"
2437 };
2438
2439 let stories: Vec<String> = self
2440 .state
2441 .prd
2442 .user_stories
2443 .iter()
2444 .map(|s| {
2445 let check = if s.passes { "[x]" } else { "[ ]" };
2446 format!("- {} {}: {}", check, s.id, s.title)
2447 })
2448 .collect();
2449
2450 format!(
2451 "{}\n\n**Project:** {}\n**Feature:** {}\n**Progress:** {}/{} stories\n**Iterations:** {}/{}\n\n## Stories\n{}",
2452 status,
2453 self.state.prd.project,
2454 self.state.prd.feature,
2455 self.state.prd.passed_count(),
2456 self.state.prd.user_stories.len(),
2457 self.state.current_iteration,
2458 self.state.max_iterations,
2459 stories.join("\n")
2460 )
2461 }
2462}
2463
2464pub fn create_prd_template(project: &str, feature: &str) -> Prd {
2466 Prd {
2467 project: project.to_string(),
2468 feature: feature.to_string(),
2469 branch_name: format!("feature/{}", feature.to_lowercase().replace(' ', "-")),
2470 version: "1.0".to_string(),
2471 user_stories: vec![UserStory {
2472 id: "US-001".to_string(),
2473 title: "First user story".to_string(),
2474 description: "Description of what needs to be implemented".to_string(),
2475 acceptance_criteria: vec!["Criterion 1".to_string(), "Criterion 2".to_string()],
2476 verification_steps: Vec::new(),
2477 passes: false,
2478 priority: 1,
2479 depends_on: Vec::new(),
2480 complexity: 3,
2481 }],
2482 technical_requirements: Vec::new(),
2483 quality_checks: QualityChecks {
2484 typecheck: Some("cargo check".to_string()),
2485 test: Some("cargo test".to_string()),
2486 lint: Some("cargo clippy".to_string()),
2487 build: Some("cargo build".to_string()),
2488 },
2489 created_at: chrono::Utc::now().to_rfc3339(),
2490 updated_at: chrono::Utc::now().to_rfc3339(),
2491 }
2492}