1use super::state_store::{RalphRunState, RalphStateStore, StoryResultEntry};
4use super::types::*;
5use crate::bus::AgentBus;
6use crate::bus::relay::{ProtocolRelayRuntime, RelayAgentProfile};
7use crate::provenance::{
8 ExecutionOrigin, ExecutionProvenance, git_commit_with_provenance_blocking,
9};
10use crate::provider::{ContentPart, Message, Provider, ProviderRegistry, Role};
11use crate::session::{Session, SessionEvent};
12use crate::swarm::{executor::AgentLoopExit, run_agent_loop};
13use crate::tool::ToolRegistry;
14use crate::tui::ralph_view::{RalphEvent, RalphStoryInfo, RalphStoryStatus};
15use crate::tui::swarm_view::SwarmEvent;
16use crate::worktree::WorktreeManager;
17use futures::stream::{self, StreamExt};
18use std::collections::HashMap;
19use std::path::{Path, PathBuf};
20use std::process::Command;
21use std::sync::Arc;
22use tokio::sync::mpsc;
23use tracing::{debug, info, warn};
24
25pub struct RalphLoop {
27 state: RalphState,
28 provider: Arc<dyn Provider>,
29 model: String,
30 config: RalphConfig,
31 event_tx: Option<mpsc::Sender<RalphEvent>>,
32 bus: Option<Arc<AgentBus>>,
33 registry: Option<Arc<ProviderRegistry>>,
34 store: Option<Arc<dyn RalphStateStore>>,
35 run_id: String,
36}
37
38impl RalphLoop {
39 pub async fn new(
41 prd_path: PathBuf,
42 provider: Arc<dyn Provider>,
43 model: String,
44 config: RalphConfig,
45 ) -> anyhow::Result<Self> {
46 let prd = Prd::load(&prd_path).await?;
47
48 let working_dir = if let Some(parent) = prd_path.parent() {
50 if parent.as_os_str().is_empty() {
51 std::env::current_dir()?
52 } else {
53 parent.to_path_buf()
54 }
55 } else {
56 std::env::current_dir()?
57 };
58
59 info!(
60 "Loaded PRD: {} - {} ({} stories, {} already passed)",
61 prd.project,
62 prd.feature,
63 prd.user_stories.len(),
64 prd.passed_count()
65 );
66
67 let resumed_iterations = prd.passed_count();
69
70 let state = RalphState {
71 prd,
72 current_iteration: resumed_iterations,
73 max_iterations: config.max_iterations,
74 status: RalphStatus::Pending,
75 progress_log: Vec::new(),
76 prd_path: prd_path.clone(),
77 working_dir,
78 };
79
80 Ok(Self {
81 state,
82 provider,
83 model,
84 config,
85 event_tx: None,
86 bus: None,
87 registry: None,
88 store: None,
89 run_id: uuid::Uuid::new_v4().to_string(),
90 })
91 }
92
93 pub fn with_event_tx(mut self, tx: mpsc::Sender<RalphEvent>) -> Self {
95 self.event_tx = Some(tx);
96 self
97 }
98
99 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
101 self.bus = Some(bus);
102 self
103 }
104
105 pub fn with_registry(mut self, registry: Arc<ProviderRegistry>) -> Self {
107 self.registry = Some(registry);
108 self
109 }
110
111 pub fn with_store(mut self, store: Arc<dyn RalphStateStore>) -> Self {
113 self.store = Some(store);
114 self
115 }
116
117 pub fn with_run_id(mut self, run_id: String) -> Self {
119 self.run_id = run_id;
120 self
121 }
122
123 fn store_fire_and_forget(
125 &self,
126 fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
127 ) {
128 tokio::spawn(async move {
129 if let Err(e) = fut.await {
130 warn!(error = %e, "State store update failed");
131 }
132 });
133 }
134
135 fn try_send_event(&self, event: RalphEvent) {
137 if let Some(ref tx) = self.event_tx {
138 let _ = tx.try_send(event);
139 }
140 }
141
142 fn bus_publish_story_result(
144 &self,
145 story: &UserStory,
146 iteration: usize,
147 learnings: &[String],
148 next_story_id: Option<&str>,
149 ) {
150 let Some(ref bus) = self.bus else { return };
151 let prd_id = &self.state.prd.project;
152 let handle = bus.handle(format!("ralph.{}", story.id));
153
154 handle.publish_ralph_learning(
156 prd_id,
157 &story.id,
158 iteration,
159 learnings.to_vec(),
160 serde_json::json!({
161 "story_title": story.title,
162 "passed": story.passes,
163 }),
164 );
165
166 if let Some(next_id) = next_story_id {
168 handle.publish_ralph_handoff(
169 prd_id,
170 &story.id,
171 next_id,
172 serde_json::json!({ "learnings": learnings }),
173 &format!(
174 "{} {} (iteration {})",
175 story.id,
176 if story.passes { "passed" } else { "failed" },
177 iteration
178 ),
179 );
180 }
181
182 handle.publish_ralph_progress(
184 prd_id,
185 self.state.prd.passed_count(),
186 self.state.prd.user_stories.len(),
187 iteration,
188 &format!("{:?}", self.state.status),
189 );
190 }
191
192 fn bus_collect_learnings(&self) -> Vec<String> {
194 let Some(ref bus) = self.bus else {
195 return Vec::new();
196 };
197 let prd_id = &self.state.prd.project;
198 let mut handle = bus.handle("ralph-collector");
199 let envelopes = handle.drain_ralph_learnings(prd_id);
200 let mut learnings = Vec::new();
201 for env in envelopes {
202 match env.message {
203 crate::bus::BusMessage::RalphLearning {
204 story_id,
205 iteration,
206 learnings: items,
207 ..
208 } => {
209 for l in items {
210 learnings.push(format!("[{story_id} iter {iteration}] {l}"));
211 }
212 }
213 crate::bus::BusMessage::RalphHandoff {
214 from_story,
215 progress_summary,
216 ..
217 } => {
218 learnings.push(format!("[handoff from {from_story}] {progress_summary}"));
219 }
220 _ => {}
221 }
222 }
223 learnings
224 }
225
226 fn create_swarm_event_bridge(
230 ralph_tx: &mpsc::Sender<RalphEvent>,
231 story_id: String,
232 ) -> (mpsc::Sender<SwarmEvent>, tokio::task::JoinHandle<()>) {
233 let (swarm_tx, mut swarm_rx) = mpsc::channel::<SwarmEvent>(100);
234 let ralph_tx = ralph_tx.clone();
235 let handle = tokio::spawn(async move {
236 while let Some(event) = swarm_rx.recv().await {
237 let ralph_event = match event {
238 SwarmEvent::AgentToolCall { tool_name, .. } => RalphEvent::StoryToolCall {
239 story_id: story_id.clone(),
240 tool_name,
241 },
242 SwarmEvent::AgentToolCallDetail { detail, .. } => {
243 RalphEvent::StoryToolCallDetail {
244 story_id: story_id.clone(),
245 detail,
246 }
247 }
248 SwarmEvent::AgentMessage { entry, .. } => RalphEvent::StoryMessage {
249 story_id: story_id.clone(),
250 entry,
251 },
252 SwarmEvent::AgentOutput { output, .. } => RalphEvent::StoryOutput {
253 story_id: story_id.clone(),
254 output,
255 },
256 SwarmEvent::AgentError { error, .. } => RalphEvent::StoryError {
257 story_id: story_id.clone(),
258 error,
259 },
260 _ => continue, };
262 if ralph_tx.send(ralph_event).await.is_err() {
263 break;
264 }
265 }
266 });
267 (swarm_tx, handle)
268 }
269
270 fn build_story_infos(prd: &Prd) -> Vec<RalphStoryInfo> {
272 prd.user_stories
273 .iter()
274 .map(|s| RalphStoryInfo {
275 id: s.id.clone(),
276 title: s.title.clone(),
277 status: if s.passes {
278 RalphStoryStatus::Passed
279 } else {
280 RalphStoryStatus::Pending
281 },
282 priority: s.priority,
283 depends_on: s.depends_on.clone(),
284 quality_checks: Vec::new(),
285 tool_call_history: Vec::new(),
286 messages: Vec::new(),
287 output: None,
288 error: None,
289 merge_summary: None,
290 steps: 0,
291 current_tool: None,
292 })
293 .collect()
294 }
295
296 pub async fn run(&mut self) -> anyhow::Result<RalphState> {
298 self.state.status = RalphStatus::Running;
299
300 if let Some(ref store) = self.store {
302 let initial_state = RalphRunState {
303 run_id: self.run_id.clone(),
304 okr_id: None,
305 prd: self.state.prd.clone(),
306 config: self.config.clone(),
307 status: RalphStatus::Running,
308 current_iteration: 0,
309 max_iterations: self.state.max_iterations,
310 progress_log: Vec::new(),
311 story_results: Vec::new(),
312 error: None,
313 created_at: chrono::Utc::now().to_rfc3339(),
314 started_at: Some(chrono::Utc::now().to_rfc3339()),
315 completed_at: None,
316 };
317 let s = store.clone();
318 self.store_fire_and_forget(async move { s.create_run(&initial_state).await });
319 }
320
321 self.try_send_event(RalphEvent::Started {
323 project: self.state.prd.project.clone(),
324 feature: self.state.prd.feature.clone(),
325 stories: Self::build_story_infos(&self.state.prd),
326 max_iterations: self.state.max_iterations,
327 });
328
329 if !self.state.prd.branch_name.is_empty() {
331 info!("Switching to branch: {}", self.state.prd.branch_name);
332 self.git_checkout(&self.state.prd.branch_name)?;
333 }
334
335 if self.config.parallel_enabled {
337 self.run_parallel().await?;
338 } else {
339 self.run_sequential().await?;
340 }
341
342 if self.state.status != RalphStatus::Completed {
346 let passed = self.state.prd.passed_count();
347 let total = self.state.prd.user_stories.len();
348
349 if self.state.prd.is_complete() {
350 self.state.status = RalphStatus::Completed;
351 } else if self.state.current_iteration >= self.state.max_iterations {
352 if self.state.current_iteration > 0 && passed == 0 && total > 0 {
353 self.state.status = RalphStatus::QualityFailed;
355 warn!(
356 iterations = self.state.current_iteration,
357 passed = passed,
358 total = total,
359 "Ralph failed: all stories failed quality checks"
360 );
361 } else {
362 self.state.status = RalphStatus::MaxIterations;
364 info!(
365 iterations = self.state.current_iteration,
366 passed = passed,
367 total = total,
368 "Ralph finished with partial progress"
369 );
370 }
371 } else if self.state.current_iteration > 0 {
372 if passed == 0 && total > 0 {
375 self.state.status = RalphStatus::QualityFailed;
376 warn!(
377 iterations = self.state.current_iteration,
378 passed = passed,
379 total = total,
380 "Ralph ended with no passing stories"
381 );
382 } else {
383 self.state.status = RalphStatus::MaxIterations;
384 info!(
385 iterations = self.state.current_iteration,
386 passed = passed,
387 total = total,
388 "Ralph ended before full completion"
389 );
390 }
391 } else {
392 self.state.status = RalphStatus::MaxIterations;
393 }
394 }
395
396 if self.config.worktree_enabled {
398 let mgr = WorktreeManager::new(&self.state.working_dir);
399 match mgr.cleanup_all().await {
400 Ok(count) if count > 0 => {
401 info!(cleaned = count, "Cleaned up orphaned worktrees/branches");
402 }
403 Ok(_) => {}
404 Err(e) => {
405 warn!(error = %e, "Failed to cleanup orphaned worktrees");
406 }
407 }
408 }
409
410 if let Some(ref store) = self.store {
412 let s = store.clone();
413 let rid = self.run_id.clone();
414 let status = self.state.status;
415 let prd = self.state.prd.clone();
416 self.store_fire_and_forget(async move {
417 s.update_prd(&rid, &prd).await?;
418 s.complete_run(&rid, status).await
419 });
420 }
421
422 info!(
423 "Ralph finished: {:?}, {}/{} stories passed",
424 self.state.status,
425 self.state.prd.passed_count(),
426 self.state.prd.user_stories.len()
427 );
428
429 self.try_send_event(RalphEvent::Complete {
431 status: format!("{:?}", self.state.status),
432 passed: self.state.prd.passed_count(),
433 total: self.state.prd.user_stories.len(),
434 });
435
436 Ok(self.state.clone())
437 }
438
439 async fn run_sequential(&mut self) -> anyhow::Result<()> {
441 while self.state.current_iteration < self.state.max_iterations {
442 self.state.current_iteration += 1;
443
444 if let Some(ref store) = self.store {
446 let s = store.clone();
447 let rid = self.run_id.clone();
448 let iter = self.state.current_iteration;
449 self.store_fire_and_forget(async move { s.update_iteration(&rid, iter).await });
450 }
451
452 info!(
453 "=== Ralph iteration {} of {} ===",
454 self.state.current_iteration, self.state.max_iterations
455 );
456
457 self.try_send_event(RalphEvent::IterationStarted {
459 iteration: self.state.current_iteration,
460 max_iterations: self.state.max_iterations,
461 });
462
463 if self.state.prd.is_complete() {
465 info!("All stories complete!");
466 self.state.status = RalphStatus::Completed;
467 break;
468 }
469
470 let story = match self.state.prd.next_story() {
472 Some(s) => s.clone(),
473 None => {
474 warn!("No available stories (dependencies not met)");
475 break;
476 }
477 };
478
479 info!("Working on story: {} - {}", story.id, story.title);
480
481 self.try_send_event(RalphEvent::StoryStarted {
483 story_id: story.id.clone(),
484 });
485
486 let bus_learnings = self.bus_collect_learnings();
488
489 let prompt = if bus_learnings.is_empty() {
491 self.build_prompt(&story)
492 } else {
493 let mut p = self.build_prompt(&story);
494 p.push_str("\n## Learnings from Previous Iterations:\n");
495 for l in &bus_learnings {
496 p.push_str(&format!("- {l}\n"));
497 }
498 p
499 };
500
501 match self.call_llm(&story.id, &prompt).await {
503 Ok(response) => {
504 let entry = ProgressEntry {
506 story_id: story.id.clone(),
507 iteration: self.state.current_iteration,
508 status: "completed".to_string(),
509 learnings: self.extract_learnings(&response),
510 files_changed: Vec::new(),
511 timestamp: chrono::Utc::now().to_rfc3339(),
512 };
513 self.append_progress(&entry, &response)?;
514 let entry_learnings = entry.learnings.clone();
515 self.state.progress_log.push(entry);
516
517 if self.config.quality_checks_enabled {
519 if self.run_quality_gates_with_events(&story.id).await? {
520 info!("Story {} passed quality checks!", story.id);
521 self.state.prd.mark_passed(&story.id);
522
523 if let Some(ref store) = self.store {
525 let s = store.clone();
526 let rid = self.run_id.clone();
527 let entry = StoryResultEntry {
528 story_id: story.id.clone(),
529 title: story.title.clone(),
530 passed: true,
531 iteration: self.state.current_iteration,
532 error: None,
533 };
534 self.store_fire_and_forget(async move {
535 s.record_story_result(&rid, &entry).await
536 });
537 }
538
539 self.try_send_event(RalphEvent::StoryComplete {
540 story_id: story.id.clone(),
541 passed: true,
542 });
543
544 let next = self.state.prd.next_story().map(|s| s.id.clone());
546 self.bus_publish_story_result(
547 &story,
548 self.state.current_iteration,
549 &entry_learnings,
550 next.as_deref(),
551 );
552
553 if self.config.auto_commit {
555 self.commit_story(&story)?;
556 }
557
558 self.state.prd.save(&self.state.prd_path).await?;
560 } else {
561 warn!("Story {} failed quality checks", story.id);
562
563 if let Some(ref store) = self.store {
565 let s = store.clone();
566 let rid = self.run_id.clone();
567 let entry = StoryResultEntry {
568 story_id: story.id.clone(),
569 title: story.title.clone(),
570 passed: false,
571 iteration: self.state.current_iteration,
572 error: Some("Quality checks failed".to_string()),
573 };
574 self.store_fire_and_forget(async move {
575 s.record_story_result(&rid, &entry).await
576 });
577 }
578
579 let next = self.state.prd.next_story().map(|s| s.id.clone());
581 self.bus_publish_story_result(
582 &story,
583 self.state.current_iteration,
584 &entry_learnings,
585 next.as_deref(),
586 );
587
588 self.try_send_event(RalphEvent::StoryComplete {
589 story_id: story.id.clone(),
590 passed: false,
591 });
592 }
593 } else {
594 self.state.prd.mark_passed(&story.id);
596 self.state.prd.save(&self.state.prd_path).await?;
597
598 if let Some(ref store) = self.store {
600 let s = store.clone();
601 let rid = self.run_id.clone();
602 let entry = StoryResultEntry {
603 story_id: story.id.clone(),
604 title: story.title.clone(),
605 passed: true,
606 iteration: self.state.current_iteration,
607 error: None,
608 };
609 self.store_fire_and_forget(async move {
610 s.record_story_result(&rid, &entry).await
611 });
612 }
613
614 self.try_send_event(RalphEvent::StoryComplete {
615 story_id: story.id.clone(),
616 passed: true,
617 });
618 }
619 }
620 Err(e) => {
621 warn!("LLM call failed: {}", e);
622 let entry = ProgressEntry {
623 story_id: story.id.clone(),
624 iteration: self.state.current_iteration,
625 status: format!("failed: {}", e),
626 learnings: Vec::new(),
627 files_changed: Vec::new(),
628 timestamp: chrono::Utc::now().to_rfc3339(),
629 };
630 self.state.progress_log.push(entry);
631
632 self.try_send_event(RalphEvent::StoryError {
633 story_id: story.id.clone(),
634 error: format!("{}", e),
635 });
636 }
637 }
638 }
639
640 Ok(())
641 }
642
643 async fn run_parallel(&mut self) -> anyhow::Result<()> {
645 let stages: Vec<Vec<UserStory>> = self
647 .state
648 .prd
649 .stages()
650 .into_iter()
651 .map(|stage| stage.into_iter().cloned().collect())
652 .collect();
653 let total_stages = stages.len();
654
655 info!(
656 "Parallel execution: {} stages, {} max concurrent stories",
657 total_stages, self.config.max_concurrent_stories
658 );
659
660 let worktree_mgr = if self.config.worktree_enabled {
662 let mgr = WorktreeManager::new(&self.state.working_dir);
663 info!("Worktree isolation enabled for parallel stories");
664 Some(Arc::new(mgr))
665 } else {
666 None
667 };
668
669 for (stage_idx, stage_stories) in stages.into_iter().enumerate() {
670 if self.state.prd.is_complete() {
671 info!("All stories complete!");
672 self.state.status = RalphStatus::Completed;
673 break;
674 }
675
676 if self.state.current_iteration >= self.state.max_iterations {
677 break;
678 }
679
680 let story_count = stage_stories.len();
681 info!(
682 "=== Stage {}/{}: {} stories in parallel ===",
683 stage_idx + 1,
684 total_stages,
685 story_count
686 );
687
688 let stories: Vec<UserStory> = stage_stories;
690
691 let mut worktrees_by_story: HashMap<String, crate::worktree::WorktreeInfo> =
697 HashMap::new();
698 if let Some(ref mgr) = worktree_mgr {
699 let ids: Vec<String> = stories.iter().map(|s| s.id.clone()).collect();
700 let created: Vec<(String, Option<crate::worktree::WorktreeInfo>)> =
701 stream::iter(ids.into_iter())
702 .map(|story_id| {
703 let mgr = Arc::clone(mgr);
704 async move {
705 let slug = story_id.to_lowercase().replace("-", "_");
706 match mgr.create(&slug).await {
707 Ok(wt) => {
708 if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
709 warn!(
710 story_id = %story_id,
711 error = %e,
712 "Failed to inject workspace stub"
713 );
714 }
715 info!(
716 story_id = %story_id,
717 worktree_path = %wt.path.display(),
718 "Pre-created worktree for story"
719 );
720 (story_id, Some(wt))
721 }
722 Err(e) => {
723 warn!(
724 story_id = %story_id,
725 error = %e,
726 "Failed to pre-create worktree, will use main directory"
727 );
728 (story_id, None)
729 }
730 }
731 }
732 })
733 .buffer_unordered(8)
734 .collect()
735 .await;
736 for (sid, wt) in created {
737 if let Some(wt) = wt {
738 worktrees_by_story.insert(sid, wt);
739 }
740 }
741 }
742
743 let semaphore = Arc::new(tokio::sync::Semaphore::new(
745 self.config.max_concurrent_stories,
746 ));
747 let provider = Arc::clone(&self.provider);
748 let model = self.model.clone();
749 let prd_info = (
750 self.state.prd.project.clone(),
751 self.state.prd.feature.clone(),
752 );
753 let working_dir = self.state.working_dir.clone();
754 let progress_path = self.config.progress_path.clone();
755
756 let mut handles = Vec::new();
757
758 let accumulated_learnings = self.bus_collect_learnings();
760
761 for story in stories {
762 let sem = Arc::clone(&semaphore);
763 let provider = Arc::clone(&provider);
764 let model = model.clone();
765 let prd_info = prd_info.clone();
766 let working_dir = working_dir.clone();
767 let worktree_mgr = worktree_mgr.clone();
768 let prebuilt_worktree = worktrees_by_story.remove(&story.id);
769 let progress_path = progress_path.clone();
770 let ralph_tx = self.event_tx.clone();
771 let stage_learnings = accumulated_learnings.clone();
772 let bus = self.bus.clone();
773 let relay_enabled = self.config.relay_enabled;
774 let relay_max_agents = self.config.relay_max_agents;
775 let relay_max_rounds = self.config.relay_max_rounds;
776 let registry = self.registry.clone();
777 let max_steps_per_story = self.config.max_steps_per_story;
778 let max_quality_retries = self.config.max_quality_retries;
779 let max_rate_limit_retries = self.config.max_rate_limit_retries;
780 let rate_limit_base_delay_ms = self.config.rate_limit_base_delay_ms;
781 let quality_checks_enabled = self.config.quality_checks_enabled;
782 let quality_checks = self.state.prd.quality_checks.clone();
783
784 let handle: tokio::task::JoinHandle<(
785 crate::ralph::types::UserStory,
786 bool,
787 crate::ralph::types::ProgressEntry,
788 Option<crate::worktree::WorktreeInfo>,
789 Option<std::sync::Arc<crate::worktree::WorktreeManager>>,
790 bool, )> = tokio::spawn(async move {
792 let _permit = sem.acquire().await.expect("semaphore closed");
793
794 let (story_working_dir, worktree_info) = if let Some(wt) = prebuilt_worktree {
801 (wt.path.clone(), Some(wt))
802 } else {
803 (working_dir.clone(), None)
804 };
805
806 info!(
807 "Working on story: {} - {} (in {:?})",
808 story.id, story.title, story_working_dir
809 );
810
811 if let Some(ref tx) = ralph_tx {
813 let _ = tx
814 .send(RalphEvent::StoryStarted {
815 story_id: story.id.clone(),
816 })
817 .await;
818 }
819
820 let mut prompt =
822 Self::build_story_prompt(&story, &prd_info, &story_working_dir);
823 if !stage_learnings.is_empty() {
824 prompt.push_str("\n## Learnings from Previous Stages:\n");
825 for l in &stage_learnings {
826 prompt.push_str(&format!("- {l}\n"));
827 }
828 }
829
830 let (bridge_tx, _bridge_handle) = if !relay_enabled {
832 if let Some(ref tx) = ralph_tx {
833 let (btx, handle) =
834 Self::create_swarm_event_bridge(tx, story.id.clone());
835 (Some(btx), Some(handle))
836 } else {
837 (None, None)
838 }
839 } else {
840 (None, None)
841 };
842
843 let call_result = {
845 let mut last_err = None;
846 let mut attempt_result = None;
847 for attempt in 0..=max_rate_limit_retries {
848 let res = if relay_enabled {
849 if let Some(ref reg) = registry {
850 Self::call_relay_static(
851 reg,
852 &model,
853 &prompt,
854 &story_working_dir,
855 ralph_tx.clone(),
856 story.id.clone(),
857 bus.clone(),
858 relay_max_agents,
859 relay_max_rounds,
860 )
861 .await
862 } else {
863 warn!(
864 story_id = %story.id,
865 "Relay enabled but no registry available, using single agent"
866 );
867 Self::call_llm_static(
868 &provider,
869 &model,
870 &prompt,
871 &story_working_dir,
872 bridge_tx.clone(),
873 story.id.clone(),
874 bus.clone(),
875 max_steps_per_story,
876 )
877 .await
878 }
879 } else {
880 Self::call_llm_static(
881 &provider,
882 &model,
883 &prompt,
884 &story_working_dir,
885 bridge_tx.clone(),
886 story.id.clone(),
887 bus.clone(),
888 max_steps_per_story,
889 )
890 .await
891 };
892 match res {
893 Ok(output) => {
894 attempt_result = Some(Ok(output));
895 break;
896 }
897 Err(e) => {
898 let err_str = format!("{e}");
899 let is_rate_limit = err_str.contains("Rate limit")
900 || err_str.contains("rate_limit")
901 || err_str.contains("429")
902 || err_str.contains("too many requests");
903 if is_rate_limit && attempt < max_rate_limit_retries {
904 let delay_ms = rate_limit_base_delay_ms
905 * 2u64.saturating_pow(attempt as u32);
906 warn!(
907 story_id = %story.id,
908 attempt = attempt + 1,
909 max_retries = max_rate_limit_retries,
910 delay_ms = delay_ms,
911 "Rate limited, backing off before retry"
912 );
913 tokio::time::sleep(std::time::Duration::from_millis(
914 delay_ms,
915 ))
916 .await;
917 } else {
918 last_err = Some(e);
919 break;
920 }
921 }
922 }
923 }
924 attempt_result.unwrap_or_else(|| {
925 Err(last_err
926 .unwrap_or_else(|| anyhow::anyhow!("All retries exhausted")))
927 })
928 };
929
930 let (final_result, quality_passed) = match call_result {
933 Ok(initial_output) => {
934 let mut output = initial_output;
935 let mut qg_passed = !quality_checks_enabled; if quality_checks_enabled {
938 for qg_attempt in 0..=max_quality_retries {
939 let cargo_root = Self::find_cargo_root(&story_working_dir);
940 let checks = &quality_checks;
941 let mut all_ok = true;
942 let mut error_output = String::new();
943
944 for (name, cmd) in [
945 ("typecheck", &checks.typecheck),
946 ("lint", &checks.lint),
947 ("test", &checks.test),
948 ("build", &checks.build),
949 ] {
950 if let Some(command) = cmd {
951 let cmd_output = std::process::Command::new("/bin/sh")
952 .arg("-c")
953 .arg(command)
954 .current_dir(&cargo_root)
955 .output();
956 match cmd_output {
957 Ok(o) if o.status.success() => {}
958 Ok(o) => {
959 let stderr = String::from_utf8_lossy(&o.stderr);
960 let stdout = String::from_utf8_lossy(&o.stdout);
961 let combined = format!("{stdout}\n{stderr}");
962 let errors: String = combined
964 .lines()
965 .filter(|l| {
966 l.starts_with("error")
967 || l.contains("error:")
968 || l.contains("error[")
969 })
970 .take(20)
971 .collect::<Vec<_>>()
972 .join("\n");
973 error_output = format!(
974 "Quality check `{name}` failed:\n{errors}"
975 );
976 all_ok = false;
977 break;
978 }
979 Err(e) => {
980 error_output = format!(
981 "Quality check `{name}` could not run: {e}"
982 );
983 all_ok = false;
984 break;
985 }
986 }
987 }
988 }
989
990 if all_ok {
991 qg_passed = true;
992 info!(
993 story_id = %story.id,
994 attempt = qg_attempt + 1,
995 "Quality gates passed"
996 );
997 break;
998 }
999
1000 if qg_attempt < max_quality_retries {
1001 warn!(
1003 story_id = %story.id,
1004 attempt = qg_attempt + 1,
1005 max_retries = max_quality_retries,
1006 "Quality gates failed, spawning repair agent"
1007 );
1008
1009 let repair_prompt = format!(
1010 "# QUALITY GATE REPAIR (attempt {}/{})\n\n\
1011 The previous implementation attempt for story {} had quality gate failures.\n\n\
1012 ## Errors:\n```\n{}\n```\n\n\
1013 ## Instructions:\n\
1014 1. Read the files mentioned in the errors\n\
1015 2. Fix ONLY the compile/lint errors shown above\n\
1016 3. Run `cargo check 2>&1` to verify your fix\n\
1017 4. Do NOT refactor or change unrelated code\n\n\
1018 Working directory: {}",
1019 qg_attempt + 1,
1020 max_quality_retries,
1021 story.id,
1022 error_output,
1023 story_working_dir.display()
1024 );
1025
1026 let repair_result = Self::call_llm_static(
1028 &provider,
1029 &model,
1030 &repair_prompt,
1031 &story_working_dir,
1032 bridge_tx.clone(),
1033 story.id.clone(),
1034 bus.clone(),
1035 20, )
1037 .await;
1038
1039 match repair_result {
1040 Ok(repair_output) => {
1041 output = format!(
1042 "{output}\n---\n[repair attempt {}] {repair_output}",
1043 qg_attempt + 1
1044 );
1045 }
1046 Err(e) => {
1047 warn!(
1048 story_id = %story.id,
1049 error = %e,
1050 "Repair agent failed"
1051 );
1052 break;
1053 }
1054 }
1055 } else {
1056 warn!(
1057 story_id = %story.id,
1058 "Quality gates still failing after {} retries",
1059 max_quality_retries
1060 );
1061 }
1062 }
1063 }
1064
1065 (Ok(output), qg_passed)
1066 }
1067 Err(e) => (Err(e), false),
1068 };
1069
1070 let entry = match &final_result {
1071 Ok(response) => {
1072 let progress_file = story_working_dir.join(&progress_path);
1074 let _ = std::fs::write(&progress_file, response);
1075
1076 ProgressEntry {
1077 story_id: story.id.clone(),
1078 iteration: 1,
1079 status: "completed".to_string(),
1080 learnings: Self::extract_learnings_static(response),
1081 files_changed: Vec::new(),
1082 timestamp: chrono::Utc::now().to_rfc3339(),
1083 }
1084 }
1085 Err(e) => {
1086 warn!("LLM call failed for story {}: {}", story.id, e);
1087 ProgressEntry {
1088 story_id: story.id.clone(),
1089 iteration: 1,
1090 status: format!("failed: {}", e),
1091 learnings: Vec::new(),
1092 files_changed: Vec::new(),
1093 timestamp: chrono::Utc::now().to_rfc3339(),
1094 }
1095 }
1096 };
1097
1098 (
1099 story,
1100 final_result.is_ok(),
1101 entry,
1102 worktree_info,
1103 worktree_mgr,
1104 quality_passed,
1105 )
1106 });
1107
1108 handles.push(handle);
1109 }
1110
1111 let _stage_passed = 0usize;
1113 for handle in handles {
1114 match handle.await {
1115 Ok((story, success, entry, worktree_info, worktree_mgr, quality_passed)) => {
1116 self.state.progress_log.push(entry);
1117
1118 if success && quality_passed {
1119 {
1122 info!("Story {} passed quality checks!", story.id);
1123
1124 if let Some(ref wt) = worktree_info {
1126 let _ = self.commit_in_dir(&wt.path, &story);
1127 }
1128
1129 if let (Some(wt), Some(mgr)) =
1131 (worktree_info.as_ref(), worktree_mgr.as_ref())
1132 {
1133 match mgr.merge(&wt.name).await {
1134 Ok(merge_result) => {
1135 if merge_result.success {
1136 info!(
1137 story_id = %story.id,
1138 files_changed = merge_result.files_changed,
1139 "Merged story changes successfully"
1140 );
1141 self.state.prd.mark_passed(&story.id);
1142 self.try_send_event(RalphEvent::StoryMerge {
1143 story_id: story.id.clone(),
1144 success: true,
1145 summary: merge_result.summary.clone(),
1146 });
1147 self.try_send_event(RalphEvent::StoryComplete {
1148 story_id: story.id.clone(),
1149 passed: true,
1150 });
1151
1152 if let Some(ref store) = self.store {
1154 let s = store.clone();
1155 let rid = self.run_id.clone();
1156 let entry = StoryResultEntry {
1157 story_id: story.id.clone(),
1158 title: story.title.clone(),
1159 passed: true,
1160 iteration: self.state.current_iteration,
1161 error: None,
1162 };
1163 self.store_fire_and_forget(async move {
1164 s.record_story_result(&rid, &entry).await
1165 });
1166 }
1167
1168 let _ = mgr.cleanup(&wt.name).await;
1170 } else if !merge_result.conflicts.is_empty() {
1171 info!(
1173 story_id = %story.id,
1174 num_conflicts = merge_result.conflicts.len(),
1175 "Spawning conflict resolver sub-agent"
1176 );
1177
1178 match Self::resolve_conflicts_static(
1180 &provider,
1181 &model,
1182 &working_dir,
1183 &story,
1184 &merge_result.conflicts,
1185 &merge_result.conflict_diffs,
1186 self.bus.clone(),
1187 )
1188 .await
1189 {
1190 Ok(resolved) => {
1191 if resolved {
1192 let commit_msg = format!(
1194 "Merge: resolved conflicts for {}",
1195 story.id
1196 );
1197 match mgr
1198 .complete_merge(
1199 &wt.name,
1200 &commit_msg,
1201 )
1202 .await
1203 {
1204 Ok(final_result) => {
1205 if final_result.success {
1206 info!(
1207 story_id = %story.id,
1208 "Merge completed after conflict resolution"
1209 );
1210 self.state
1211 .prd
1212 .mark_passed(&story.id);
1213 } else {
1214 warn!(
1215 story_id = %story.id,
1216 "Merge failed even after resolution"
1217 );
1218 let _ = mgr
1219 .abort_merge(&wt.name)
1220 .await;
1221 }
1222 }
1223 Err(e) => {
1224 warn!(
1225 story_id = %story.id,
1226 error = %e,
1227 "Failed to complete merge after resolution"
1228 );
1229 let _ = mgr
1230 .abort_merge(&wt.name)
1231 .await;
1232 }
1233 }
1234 } else {
1235 warn!(
1236 story_id = %story.id,
1237 "Conflict resolver could not resolve all conflicts"
1238 );
1239 let _ = mgr.abort_merge(&wt.name).await;
1240 }
1241 }
1242 Err(e) => {
1243 warn!(
1244 story_id = %story.id,
1245 error = %e,
1246 "Conflict resolver failed"
1247 );
1248 let _ = mgr.abort_merge(&wt.name).await;
1249 }
1250 }
1251 let _ = mgr.cleanup(&wt.name).await;
1253 } else if merge_result.aborted {
1254 warn!(
1256 story_id = %story.id,
1257 summary = %merge_result.summary,
1258 "Merge was aborted due to non-conflict failure"
1259 );
1260 let _ = mgr.cleanup(&wt.name).await;
1262 } else {
1263 warn!(
1265 story_id = %story.id,
1266 summary = %merge_result.summary,
1267 "Merge failed but not aborted - manual intervention may be needed"
1268 );
1269 }
1271 }
1272 Err(e) => {
1273 warn!(
1274 story_id = %story.id,
1275 error = %e,
1276 "Failed to merge worktree"
1277 );
1278 }
1279 }
1280 } else {
1281 self.state.prd.mark_passed(&story.id);
1283 self.try_send_event(RalphEvent::StoryComplete {
1284 story_id: story.id.clone(),
1285 passed: true,
1286 });
1287
1288 if let Some(ref store) = self.store {
1290 let s = store.clone();
1291 let rid = self.run_id.clone();
1292 let entry = StoryResultEntry {
1293 story_id: story.id.clone(),
1294 title: story.title.clone(),
1295 passed: true,
1296 iteration: self.state.current_iteration,
1297 error: None,
1298 };
1299 self.store_fire_and_forget(async move {
1300 s.record_story_result(&rid, &entry).await
1301 });
1302 }
1303 }
1304 }
1305 } else {
1306 self.try_send_event(RalphEvent::StoryError {
1308 story_id: story.id.clone(),
1309 error: "LLM call failed".to_string(),
1310 });
1311 if let Some(ref wt) = worktree_info {
1312 info!(
1313 story_id = %story.id,
1314 worktree_path = %wt.path.display(),
1315 "Keeping worktree for debugging (story failed)"
1316 );
1317 }
1318 }
1319 }
1320 Err(e) => {
1321 warn!("Story execution task failed: {}", e);
1322 }
1323 }
1324 }
1325
1326 self.state.current_iteration += 1;
1328
1329 if self.bus.is_some() {
1331 let bus_learnings: Vec<String> = self
1332 .state
1333 .progress_log
1334 .iter()
1335 .flat_map(|e| e.learnings.clone())
1336 .collect();
1337 for story in &self.state.prd.user_stories {
1338 if story.passes {
1339 self.bus_publish_story_result(
1340 story,
1341 self.state.current_iteration,
1342 &bus_learnings,
1343 None,
1344 );
1345 }
1346 }
1347 }
1348
1349 self.state.prd.save(&self.state.prd_path).await?;
1351
1352 if let Some(ref store) = self.store {
1354 let s = store.clone();
1355 let rid = self.run_id.clone();
1356 let prd = self.state.prd.clone();
1357 let iter = self.state.current_iteration;
1358 self.store_fire_and_forget(async move {
1359 s.update_prd(&rid, &prd).await?;
1360 s.update_iteration(&rid, iter).await
1361 });
1362 }
1363 }
1364
1365 Ok(())
1366 }
1367
1368 fn build_story_prompt(
1370 story: &UserStory,
1371 prd_info: &(String, String),
1372 working_dir: &Path,
1373 ) -> String {
1374 let wd = working_dir.display();
1375 format!(
1376 r#"# PRD: {} - {}
1377
1378## Working Directory: {wd}
1379
1380## Current Story: {} - {}
1381
1382{}
1383
1384### Acceptance Criteria:
1385{}
1386
1387## WORKFLOW (follow this exactly):
1388
13891. **EXPLORE** (2-4 tool calls): Use `glob` and `read` to understand existing code
13902. **IMPLEMENT** (5-15 tool calls): Use `write` or `edit` to make changes
13913. **VERIFY**: Run a verification command appropriate to the language (see below)
13924. **FIX OR FINISH**:
1393 - If no errors: Output `STORY_COMPLETE: {}` and STOP
1394 - If errors: Parse the error, fix it, re-verify (max 3 fix attempts)
1395 - After 3 failed attempts: Output `STORY_BLOCKED: <error summary>` and STOP
1396
1397## VERIFICATION BY LANGUAGE:
1398- **Rust**: `bash` with `cargo check 2>&1`
1399- **Python**: `bash` with `python -c "import ast; ast.parse(open('FILE').read())"` for each changed file
1400- **TypeScript/JavaScript**: `bash` with `npx tsc --noEmit` (if tsconfig.json exists)
1401- Choose the right check for the files you modified
1402
1403## TOOL USAGE:
1404- `read`: Read file content (always read before editing!)
1405- `edit`: Modify files (MUST include 3+ lines before/after for unique context)
1406- `write`: Create new files
1407- `bash`: Run commands with `{{"command": "...", "cwd": "{wd}"}}`
1408
1409## CRITICAL RULES:
1410- ALWAYS read a file before editing it
1411- When edit fails with "ambiguous match", include MORE context lines
1412- Do NOT add TODO/placeholder comments
1413- Count your fix attempts - STOP after 3 failures
1414
1415## TERMINATION:
1416SUCCESS: Output `STORY_COMPLETE: {}`
1417BLOCKED: Output `STORY_BLOCKED: <brief error description>`
1418
1419Do NOT keep iterating indefinitely. Stop when done or blocked.
1420"#,
1421 prd_info.0,
1422 prd_info.1,
1423 story.id,
1424 story.title,
1425 story.description,
1426 story
1427 .acceptance_criteria
1428 .iter()
1429 .map(|c| format!("- {}", c))
1430 .collect::<Vec<_>>()
1431 .join("\n"),
1432 story.id,
1433 story.id
1434 )
1435 }
1436
1437 async fn call_llm_static(
1439 provider: &Arc<dyn Provider>,
1440 model: &str,
1441 prompt: &str,
1442 working_dir: &PathBuf,
1443 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1444 story_id: String,
1445 bus: Option<Arc<AgentBus>>,
1446 max_steps_per_story: usize,
1447 ) -> anyhow::Result<String> {
1448 let system_prompt = crate::agent::builtin::build_system_prompt(working_dir);
1450
1451 let tool_registry =
1453 ToolRegistry::with_provider_arc(Arc::clone(provider), model.to_string());
1454
1455 let tool_definitions: Vec<_> = tool_registry
1457 .definitions()
1458 .into_iter()
1459 .filter(|t| t.name != "question")
1460 .collect();
1461
1462 info!(
1463 "Ralph sub-agent starting with {} tools in {:?}",
1464 tool_definitions.len(),
1465 working_dir
1466 );
1467
1468 let max_steps = max_steps_per_story;
1470 let (output, steps, tool_calls, _exit_reason) = run_agent_loop(
1471 Arc::clone(provider),
1472 model,
1473 &system_prompt,
1474 prompt,
1475 tool_definitions,
1476 tool_registry, max_steps,
1478 180, event_tx,
1480 story_id,
1481 bus.clone(),
1482 Some(working_dir.clone()),
1483 )
1484 .await?;
1485
1486 info!(
1487 "Ralph sub-agent completed: {} steps, {} tool calls",
1488 steps, tool_calls
1489 );
1490
1491 Ok(output)
1492 }
1493
1494 fn build_implementation_relay_profiles(
1496 max_agents: usize,
1497 working_dir: &std::path::Path,
1498 ) -> Vec<(String, String, Vec<String>)> {
1499 let wd = working_dir.display();
1500 let mut profiles = vec![
1501 (
1502 "auto-planner".to_string(),
1503 format!(
1504 "You are @auto-planner.\n\
1505 Specialty: Code analysis and implementation planning.\n\
1506 Mission: Read the codebase to understand existing patterns, then produce a concrete step-by-step implementation plan.\n\
1507 Working directory: {wd}\n\n\
1508 Read existing code first using the `read` tool. Identify all files that need changes.\n\
1509 Produce a numbered list of specific changes with file paths and code snippets.\n\
1510 Pass your plan in a clear format the next agent can follow step by step."
1511 ),
1512 vec!["planning".into(), "analysis".into(), "relay".into()],
1513 ),
1514 (
1515 "auto-coder".to_string(),
1516 format!(
1517 "You are @auto-coder.\n\
1518 Specialty: Code implementation.\n\
1519 Mission: Implement changes according to the plan, write production code, verify it compiles.\n\
1520 Working directory: {wd}\n\n\
1521 Follow the plan from the incoming handoff. Use `edit` and `write` tools to make changes.\n\
1522 After implementing, run `bash` with `cargo check 2>&1` (cwd: {wd}) to verify.\n\
1523 Fix any compilation errors. Report what you implemented."
1524 ),
1525 vec!["implementation".into(), "coding".into(), "relay".into()],
1526 ),
1527 (
1528 "auto-reviewer".to_string(),
1529 format!(
1530 "You are @auto-reviewer.\n\
1531 Specialty: Code review and quality verification.\n\
1532 Mission: Review implemented changes, run quality checks, fix remaining issues.\n\
1533 Working directory: {wd}\n\n\
1534 Review the code changes from the incoming handoff. Run `bash` with `cargo check 2>&1` (cwd: {wd}).\n\
1535 Fix any remaining issues. Verify the implementation meets the acceptance criteria.\n\
1536 If complete, include STORY_COMPLETE in your output. If blocked, include STORY_BLOCKED."
1537 ),
1538 vec!["review".into(), "verification".into(), "relay".into()],
1539 ),
1540 ];
1541
1542 if max_agents >= 4 {
1543 profiles.push((
1544 "auto-tester".to_string(),
1545 format!(
1546 "You are @auto-tester.\n\
1547 Specialty: Test writing and verification.\n\
1548 Mission: Write tests for the implemented changes and ensure they pass.\n\
1549 Working directory: {wd}\n\n\
1550 Review the implementation, write appropriate tests, run them with `bash`.\n\
1551 Report test results and coverage gaps."
1552 ),
1553 vec!["testing".into(), "relay".into()],
1554 ));
1555 }
1556
1557 if max_agents >= 5 {
1558 profiles.push((
1559 "auto-refactorer".to_string(),
1560 format!(
1561 "You are @auto-refactorer.\n\
1562 Specialty: Code quality and lint compliance.\n\
1563 Mission: Run clippy, fix warnings, improve code quality without changing behavior.\n\
1564 Working directory: {wd}\n\n\
1565 Run `bash` with `cargo clippy --all-features 2>&1` (cwd: {wd}).\n\
1566 Fix warnings and improve naming, structure, error handling."
1567 ),
1568 vec!["quality".into(), "relay".into()],
1569 ));
1570 }
1571
1572 profiles.truncate(max_agents);
1573 profiles
1574 }
1575
1576 fn normalize_for_relay(text: &str) -> String {
1578 let mut normalized = String::with_capacity(text.len().min(512));
1579 let mut last_was_space = false;
1580 for ch in text.chars() {
1581 if ch.is_ascii_alphanumeric() {
1582 normalized.push(ch.to_ascii_lowercase());
1583 last_was_space = false;
1584 } else if ch.is_whitespace() && !last_was_space {
1585 normalized.push(' ');
1586 last_was_space = true;
1587 }
1588 if normalized.len() >= 280 {
1589 break;
1590 }
1591 }
1592 normalized.trim().to_string()
1593 }
1594
1595 fn prepare_relay_handoff(from_agent: &str, output: &str, original_task: &str) -> String {
1597 let max_len = 6_000;
1598 let relay_payload = if output.len() > max_len {
1599 let truncated: String = output.chars().take(max_len).collect();
1600 format!("{truncated}\n... (truncated)")
1601 } else {
1602 output.to_string()
1603 };
1604
1605 format!(
1606 "Original task:\n{original_task}\n\n\
1607 Incoming handoff from @{from_agent}:\n{relay_payload}\n\n\
1608 Continue the work from this handoff. Keep your response focused and provide concrete next steps."
1609 )
1610 }
1611
1612 async fn call_relay_static(
1619 registry: &Arc<ProviderRegistry>,
1620 model: &str,
1621 prompt: &str,
1622 working_dir: &Path,
1623 ralph_tx: Option<mpsc::Sender<RalphEvent>>,
1624 story_id: String,
1625 bus: Option<Arc<AgentBus>>,
1626 max_agents: usize,
1627 max_rounds: usize,
1628 ) -> anyhow::Result<String> {
1629 let max_agents = max_agents.clamp(2, 8);
1630 let max_rounds = max_rounds.clamp(1, 5);
1631
1632 let profiles = Self::build_implementation_relay_profiles(max_agents, working_dir);
1633
1634 let relay_bus = bus.unwrap_or_else(|| Arc::new(AgentBus::new()));
1636 let relay = ProtocolRelayRuntime::new(relay_bus.clone());
1637
1638 let mut sessions: HashMap<String, Session> = HashMap::new();
1639 let mut ordered_agents: Vec<String> = Vec::new();
1640 let mut relay_profiles: Vec<RelayAgentProfile> = Vec::new();
1641
1642 for (name, instructions, capabilities) in &profiles {
1643 let mut session = Session::new().await?;
1644 session.metadata.model = Some(model.to_string());
1645 session.set_agent_name(name.clone());
1646 session.bus = Some(relay_bus.clone());
1647 session.add_message(Message {
1648 role: Role::System,
1649 content: vec![ContentPart::Text {
1650 text: instructions.clone(),
1651 }],
1652 });
1653
1654 relay_profiles.push(RelayAgentProfile {
1655 name: name.clone(),
1656 capabilities: capabilities.clone(),
1657 });
1658 ordered_agents.push(name.clone());
1659 sessions.insert(name.clone(), session);
1660 }
1661
1662 relay.register_agents(&relay_profiles);
1663
1664 info!(
1665 story_id = %story_id,
1666 agents = ordered_agents.len(),
1667 rounds = max_rounds,
1668 "Starting relay team for story"
1669 );
1670
1671 let mut baton = prompt.to_string();
1672 let mut previous_normalized: Option<String> = None;
1673 let mut convergence_hits = 0usize;
1674 let mut turns = 0usize;
1675
1676 'relay_loop: for round in 1..=max_rounds {
1677 for idx in 0..ordered_agents.len() {
1678 let to = ordered_agents[idx].clone();
1679 let from = if idx == 0 {
1680 if round == 1 {
1681 "user".to_string()
1682 } else {
1683 ordered_agents[ordered_agents.len() - 1].clone()
1684 }
1685 } else {
1686 ordered_agents[idx - 1].clone()
1687 };
1688
1689 turns += 1;
1690 relay.send_handoff(&from, &to, &baton);
1691
1692 if let Some(ref tx) = ralph_tx {
1693 let _ = tx
1694 .send(RalphEvent::StoryToolCall {
1695 story_id: story_id.clone(),
1696 tool_name: format!(
1697 "relay: @{from} → @{to} (round {round}/{max_rounds})"
1698 ),
1699 })
1700 .await;
1701 }
1702
1703 let Some(mut session) = sessions.remove(&to) else {
1704 anyhow::bail!("Relay agent @{to} session unavailable for story {story_id}");
1705 };
1706
1707 let (event_tx, mut event_rx) = mpsc::channel::<SessionEvent>(256);
1708 let registry_clone = registry.clone();
1709 let baton_clone = baton.clone();
1710
1711 let join = tokio::spawn(async move {
1712 let result = session
1713 .prompt_with_events(&baton_clone, event_tx, registry_clone)
1714 .await;
1715 (session, result)
1716 });
1717
1718 while !join.is_finished() {
1720 while let Ok(event) = event_rx.try_recv() {
1721 if let Some(ref tx) = ralph_tx
1722 && let SessionEvent::ToolCallStart { ref name, .. } = event
1723 {
1724 let _ = tx
1725 .send(RalphEvent::StoryToolCall {
1726 story_id: story_id.clone(),
1727 tool_name: format!("@{to}: {name}"),
1728 })
1729 .await;
1730 }
1731 }
1732 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1733 }
1734
1735 let (updated_session, result) = join
1736 .await
1737 .map_err(|e| anyhow::anyhow!("Relay agent @{to} task error: {e}"))?;
1738
1739 while event_rx.try_recv().is_ok() {}
1741
1742 sessions.insert(to.clone(), updated_session);
1743
1744 let output = result
1745 .map_err(|e| {
1746 anyhow::anyhow!("Relay agent @{to} failed on story {story_id}: {e}")
1747 })?
1748 .text;
1749
1750 let normalized = Self::normalize_for_relay(&output);
1752 if previous_normalized.as_deref() == Some(normalized.as_str()) {
1753 convergence_hits += 1;
1754 } else {
1755 convergence_hits = 0;
1756 }
1757 previous_normalized = Some(normalized);
1758
1759 if convergence_hits >= 2 {
1760 info!(story_id = %story_id, turns, "Relay converged");
1761 baton = output;
1762 break 'relay_loop;
1763 }
1764
1765 if output.contains("STORY_COMPLETE") || output.contains("STORY_BLOCKED") {
1767 info!(story_id = %story_id, turns, "Story reached terminal state via relay");
1768 baton = output;
1769 break 'relay_loop;
1770 }
1771
1772 baton = Self::prepare_relay_handoff(&to, &output, prompt);
1774 }
1775 }
1776
1777 relay.shutdown_agents(&ordered_agents);
1778
1779 info!(
1780 story_id = %story_id,
1781 turns,
1782 convergence_hits,
1783 "Relay team completed"
1784 );
1785
1786 Ok(baton)
1787 }
1788
1789 async fn resolve_conflicts_static(
1791 provider: &Arc<dyn Provider>,
1792 model: &str,
1793 working_dir: &Path,
1794 story: &UserStory,
1795 conflicts: &[String],
1796 conflict_diffs: &[(String, String)],
1797 bus: Option<Arc<AgentBus>>,
1798 ) -> anyhow::Result<bool> {
1799 info!(
1800 story_id = %story.id,
1801 num_conflicts = conflicts.len(),
1802 "Starting conflict resolution sub-agent"
1803 );
1804
1805 let conflict_info = conflict_diffs
1807 .iter()
1808 .map(|(file, diff)| format!("### File: {}\n```diff\n{}\n```", file, diff))
1809 .collect::<Vec<_>>()
1810 .join("\n\n");
1811
1812 let prompt = format!(
1813 r#"# CONFLICT RESOLUTION TASK
1814
1815## Story Context: {} - {}
1816{}
1817
1818## Conflicting Files
1819The following files have merge conflicts that need resolution:
1820{}
1821
1822## Conflict Details
1823{}
1824
1825## Your Task
18261. Read each conflicting file to see the conflict markers
18272. Understand what BOTH sides are trying to do:
1828 - HEAD (main branch): the current state
1829 - The incoming branch: the sub-agent's changes for story {}
18303. Resolve each conflict by:
1831 - Keeping BOTH changes if they don't actually conflict
1832 - Merging the logic if they touch the same code
1833 - Preferring the sub-agent's changes if they implement the story requirement
18344. Remove ALL conflict markers (<<<<<<<, =======, >>>>>>>)
18355. Ensure the final code compiles: run `cargo check`
1836
1837## CRITICAL RULES
1838- Do NOT leave any conflict markers in files
1839- Do NOT just pick one side - understand and merge the intent
1840- MUST run `cargo check` after resolving to verify
1841- Stage resolved files with `git add <file>`
1842
1843## Termination
1844SUCCESS: Output `CONFLICTS_RESOLVED` when all files are resolved and compile
1845FAILED: Output `CONFLICTS_UNRESOLVED: <reason>` if you cannot resolve
1846
1847Working directory: {}
1848"#,
1849 story.id,
1850 story.title,
1851 story.description,
1852 conflicts
1853 .iter()
1854 .map(|f| format!("- {}", f))
1855 .collect::<Vec<_>>()
1856 .join("\n"),
1857 conflict_info,
1858 story.id,
1859 working_dir.display()
1860 );
1861
1862 let system_prompt = crate::agent::builtin::build_system_prompt(working_dir);
1864
1865 let tool_registry =
1867 ToolRegistry::with_provider_arc(Arc::clone(provider), model.to_string());
1868
1869 let tool_definitions: Vec<_> = tool_registry
1870 .definitions()
1871 .into_iter()
1872 .filter(|t| t.name != "question")
1873 .collect();
1874
1875 info!(
1876 "Conflict resolver starting with {} tools",
1877 tool_definitions.len()
1878 );
1879
1880 let (output, steps, tool_calls, _exit_reason) = run_agent_loop(
1882 Arc::clone(provider),
1883 model,
1884 &system_prompt,
1885 &prompt,
1886 tool_definitions,
1887 tool_registry,
1888 15, 120, None,
1891 String::new(),
1892 bus.clone(),
1893 Some(working_dir.to_path_buf()),
1894 )
1895 .await?;
1896
1897 info!(
1898 story_id = %story.id,
1899 steps = steps,
1900 tool_calls = tool_calls,
1901 "Conflict resolver completed"
1902 );
1903
1904 let resolved = output.contains("CONFLICTS_RESOLVED")
1906 || (output.contains("resolved") && !output.contains("UNRESOLVED"));
1907
1908 if resolved {
1909 info!(story_id = %story.id, "Conflicts resolved successfully");
1910 } else {
1911 warn!(
1912 story_id = %story.id,
1913 output = %output.chars().take(200).collect::<String>(),
1914 "Conflict resolution may have failed"
1915 );
1916 }
1917
1918 Ok(resolved)
1919 }
1920
1921 fn extract_learnings_static(response: &str) -> Vec<String> {
1923 response
1924 .lines()
1925 .filter(|line| {
1926 line.contains("learned") || line.contains("Learning") || line.contains("# What")
1927 })
1928 .map(|line| line.trim().to_string())
1929 .collect()
1930 }
1931
1932 fn commit_in_dir(&self, dir: &PathBuf, story: &UserStory) -> anyhow::Result<()> {
1934 let _ = Command::new("git")
1936 .args(["add", "-A"])
1937 .current_dir(dir)
1938 .output();
1939
1940 let msg = format!("feat({}): {}", story.id.to_lowercase(), story.title);
1942 let provenance = self.commit_provenance();
1943 let _ = git_commit_with_provenance_blocking(dir, &msg, Some(&provenance));
1944
1945 Ok(())
1946 }
1947
1948 fn find_cargo_root(dir: &Path) -> PathBuf {
1953 fn has_cargo_toml(path: &Path) -> bool {
1954 path.join("Cargo.toml").exists()
1955 }
1956
1957 if has_cargo_toml(dir) {
1959 return dir.to_path_buf();
1960 }
1961
1962 if let Ok(entries) = std::fs::read_dir(dir) {
1964 let mut subdirs: Vec<_> = entries
1965 .filter_map(|e| e.ok())
1966 .filter(|e| e.path().is_dir())
1967 .map(|e| e.path())
1968 .collect();
1969
1970 subdirs.sort();
1972
1973 for subdir in subdirs {
1974 if has_cargo_toml(&subdir) {
1975 return subdir;
1976 }
1977
1978 if let Ok(sub_entries) = std::fs::read_dir(&subdir) {
1980 let mut sub_subdirs: Vec<_> = sub_entries
1981 .filter_map(|e| e.ok())
1982 .filter(|e| e.path().is_dir())
1983 .map(|e| e.path())
1984 .collect();
1985
1986 sub_subdirs.sort();
1987
1988 for sub_subdir in sub_subdirs {
1989 if has_cargo_toml(&sub_subdir) {
1990 return sub_subdir;
1991 }
1992 }
1993 }
1994 }
1995 }
1996
1997 warn!(
1999 dir = %dir.display(),
2000 "No Cargo.toml found, using worktree root for cargo commands"
2001 );
2002 dir.to_path_buf()
2003 }
2004
2005 async fn run_quality_gates_in_dir_with_events(
2007 &self,
2008 dir: &Path,
2009 story_id: &str,
2010 ) -> anyhow::Result<bool> {
2011 let cargo_root = Self::find_cargo_root(dir);
2013 debug!(
2014 worktree_dir = %dir.display(),
2015 cargo_root = %cargo_root.display(),
2016 "Running quality gates"
2017 );
2018
2019 let checks = &self.state.prd.quality_checks;
2020 let mut all_passed = true;
2021
2022 for (name, cmd) in [
2023 ("typecheck", &checks.typecheck),
2024 ("lint", &checks.lint),
2025 ("test", &checks.test),
2026 ("build", &checks.build),
2027 ] {
2028 if let Some(command) = cmd {
2029 debug!("Running {} check in {:?}: {}", name, cargo_root, command);
2030 let output = Command::new("/bin/sh")
2031 .arg("-c")
2032 .arg(command)
2033 .current_dir(&cargo_root)
2034 .output()
2035 .map_err(|e| {
2036 anyhow::anyhow!("Failed to run quality check '{}': {}", name, e)
2037 })?;
2038
2039 let passed = output.status.success();
2040 self.try_send_event(RalphEvent::StoryQualityCheck {
2041 story_id: story_id.to_string(),
2042 check_name: name.to_string(),
2043 passed,
2044 });
2045
2046 if !passed {
2047 let stderr = String::from_utf8_lossy(&output.stderr);
2048 let stdout = String::from_utf8_lossy(&output.stdout);
2049 let combined = format!("{}\n{}", stdout, stderr);
2050 let error_summary: String = combined
2051 .lines()
2052 .filter(|line| {
2053 line.starts_with("error")
2054 || line.contains("error:")
2055 || line.contains("error[")
2056 })
2057 .take(5)
2058 .collect::<Vec<_>>()
2059 .join("\n");
2060 warn!(
2061 check = %name,
2062 cargo_root = %cargo_root.display(),
2063 error_summary = %error_summary.chars().take(300).collect::<String>(),
2064 "{} check failed in {:?}",
2065 name,
2066 cargo_root
2067 );
2068 all_passed = false;
2069 break; }
2071 }
2072 }
2073
2074 Ok(all_passed)
2075 }
2076
2077 fn build_prompt(&self, story: &UserStory) -> String {
2079 let progress = self.load_progress().unwrap_or_default();
2080
2081 format!(
2082 r#"# PRD: {} - {}
2083
2084## Current Story: {} - {}
2085
2086{}
2087
2088### Acceptance Criteria:
2089{}
2090
2091## Previous Progress:
2092{}
2093
2094## Instructions:
20951. Implement the requirements for this story
20962. Write any necessary code changes
20973. Document what you learned
20984. End with `STORY_COMPLETE: {}` when done
2099
2100Respond with the implementation and any shell commands needed.
2101"#,
2102 self.state.prd.project,
2103 self.state.prd.feature,
2104 story.id,
2105 story.title,
2106 story.description,
2107 story
2108 .acceptance_criteria
2109 .iter()
2110 .map(|c| format!("- {}", c))
2111 .collect::<Vec<_>>()
2112 .join("\n"),
2113 if progress.is_empty() {
2114 "None yet".to_string()
2115 } else {
2116 progress
2117 },
2118 story.id
2119 )
2120 }
2121
2122 async fn call_llm(&self, story_id: &str, prompt: &str) -> anyhow::Result<String> {
2124 let system_prompt = crate::agent::builtin::build_system_prompt(&self.state.working_dir);
2126
2127 let tool_registry =
2129 ToolRegistry::with_provider_arc(Arc::clone(&self.provider), self.model.clone());
2130
2131 let tool_definitions: Vec<_> = tool_registry
2133 .definitions()
2134 .into_iter()
2135 .filter(|t| t.name != "question")
2136 .collect();
2137
2138 info!(
2139 "Ralph agent starting with {} tools in {:?}",
2140 tool_definitions.len(),
2141 self.state.working_dir
2142 );
2143
2144 let (bridge_tx, _bridge_handle) = if let Some(ref ralph_tx) = self.event_tx {
2146 let (tx, handle) = Self::create_swarm_event_bridge(ralph_tx, story_id.to_string());
2147 (Some(tx), Some(handle))
2148 } else {
2149 (None, None)
2150 };
2151
2152 let (output, steps, tool_calls, exit_reason) = run_agent_loop(
2154 Arc::clone(&self.provider),
2155 &self.model,
2156 &system_prompt,
2157 prompt,
2158 tool_definitions,
2159 tool_registry, 30, 180, bridge_tx,
2163 story_id.to_string(),
2164 self.bus.clone(),
2165 Some(self.state.working_dir.clone()),
2166 )
2167 .await?;
2168
2169 match exit_reason {
2171 AgentLoopExit::Completed => {
2172 info!(
2173 "Ralph agent completed: {} steps, {} tool calls",
2174 steps, tool_calls
2175 );
2176 }
2177 AgentLoopExit::MaxStepsReached => {
2178 warn!(
2179 story_id = %story_id,
2180 steps = steps,
2181 tool_calls = tool_calls,
2182 "Ralph sub-agent hit max steps limit - work may be incomplete"
2183 );
2184 }
2185 AgentLoopExit::TimedOut => {
2186 warn!(
2187 story_id = %story_id,
2188 steps = steps,
2189 tool_calls = tool_calls,
2190 "Ralph sub-agent timed out - work may be incomplete"
2191 );
2192 }
2193 }
2194
2195 Ok(output)
2196 }
2197
2198 async fn run_quality_gates_with_events(&self, story_id: &str) -> anyhow::Result<bool> {
2200 let cargo_root = Self::find_cargo_root(&self.state.working_dir);
2202 debug!(
2203 working_dir = %self.state.working_dir.display(),
2204 cargo_root = %cargo_root.display(),
2205 "Running quality gates"
2206 );
2207
2208 let checks = &self.state.prd.quality_checks;
2209 let mut all_passed = true;
2210
2211 for (name, cmd) in [
2212 ("typecheck", &checks.typecheck),
2213 ("lint", &checks.lint),
2214 ("test", &checks.test),
2215 ("build", &checks.build),
2216 ] {
2217 if let Some(command) = cmd {
2218 debug!("Running {} check in {:?}: {}", name, cargo_root, command);
2219 let output = Command::new("/bin/sh")
2220 .arg("-c")
2221 .arg(command)
2222 .current_dir(&cargo_root)
2223 .output()
2224 .map_err(|e| {
2225 anyhow::anyhow!("Failed to run quality check '{}': {}", name, e)
2226 })?;
2227
2228 let passed = output.status.success();
2229 self.try_send_event(RalphEvent::StoryQualityCheck {
2230 story_id: story_id.to_string(),
2231 check_name: name.to_string(),
2232 passed,
2233 });
2234
2235 if !passed {
2236 let stderr = String::from_utf8_lossy(&output.stderr);
2237 let stdout = String::from_utf8_lossy(&output.stdout);
2238 let combined = format!("{}\n{}", stdout, stderr);
2239 let error_summary: String = combined
2240 .lines()
2241 .filter(|line| {
2242 line.starts_with("error")
2243 || line.contains("error:")
2244 || line.contains("error[")
2245 })
2246 .take(5)
2247 .collect::<Vec<_>>()
2248 .join("\n");
2249 warn!(
2250 check = %name,
2251 cargo_root = %cargo_root.display(),
2252 error_summary = %error_summary.chars().take(300).collect::<String>(),
2253 "{} check failed in {:?}",
2254 name, cargo_root
2255 );
2256 all_passed = false;
2257 break; }
2259 }
2260 }
2261
2262 Ok(all_passed)
2263 }
2264
2265 fn commit_story(&self, story: &UserStory) -> anyhow::Result<()> {
2267 info!("Committing changes for story: {}", story.id);
2268
2269 let _ = Command::new("git")
2271 .args(["add", "-A"])
2272 .current_dir(&self.state.working_dir)
2273 .output();
2274
2275 let msg = format!("feat({}): {}", story.id.to_lowercase(), story.title);
2277 let provenance = self.commit_provenance();
2278 match git_commit_with_provenance_blocking(&self.state.working_dir, &msg, Some(&provenance))
2279 {
2280 Ok(output) if output.status.success() => {
2281 info!("Committed: {}", msg);
2282 }
2283 Ok(output) => {
2284 warn!(
2285 "Git commit had no changes or failed: {}",
2286 String::from_utf8_lossy(&output.stderr)
2287 );
2288 }
2289 Err(e) => {
2290 warn!("Could not run git commit: {}", e);
2291 }
2292 }
2293
2294 Ok(())
2295 }
2296
2297 fn commit_provenance(&self) -> ExecutionProvenance {
2298 let mut provenance = ExecutionProvenance::for_operation("ralph", ExecutionOrigin::Ralph);
2299 provenance.set_run_id(self.run_id.clone());
2300 provenance
2301 }
2302
2303 fn git_checkout(&self, branch: &str) -> anyhow::Result<()> {
2305 let output = Command::new("git")
2307 .args(["checkout", branch])
2308 .current_dir(&self.state.working_dir)
2309 .output()?;
2310
2311 if !output.status.success() {
2312 Command::new("git")
2313 .args(["checkout", "-b", branch])
2314 .current_dir(&self.state.working_dir)
2315 .output()?;
2316 }
2317
2318 Ok(())
2319 }
2320
2321 fn load_progress(&self) -> anyhow::Result<String> {
2323 let path = self.state.working_dir.join(&self.config.progress_path);
2324 Ok(std::fs::read_to_string(path).unwrap_or_default())
2325 }
2326
2327 fn append_progress(&self, entry: &ProgressEntry, response: &str) -> anyhow::Result<()> {
2329 let path = self.state.working_dir.join(&self.config.progress_path);
2330 let mut content = self.load_progress().unwrap_or_default();
2331
2332 content.push_str(&format!(
2333 "\n---\n\n## Iteration {} - {} ({})\n\n**Status:** {}\n\n### Summary\n{}\n",
2334 entry.iteration, entry.story_id, entry.timestamp, entry.status, response
2335 ));
2336
2337 std::fs::write(path, content)?;
2338 Ok(())
2339 }
2340
2341 fn extract_learnings(&self, response: &str) -> Vec<String> {
2343 let mut learnings = Vec::new();
2344
2345 for line in response.lines() {
2346 if line.contains("learned") || line.contains("Learning") || line.contains("# What") {
2347 learnings.push(line.trim().to_string());
2348 }
2349 }
2350
2351 learnings
2352 }
2353
2354 pub fn status(&self) -> &RalphState {
2356 &self.state
2357 }
2358
2359 pub fn status_markdown(&self) -> String {
2361 let status = if self.state.prd.is_complete() {
2362 "# Ralph Complete!"
2363 } else {
2364 "# Ralph Status"
2365 };
2366
2367 let stories: Vec<String> = self
2368 .state
2369 .prd
2370 .user_stories
2371 .iter()
2372 .map(|s| {
2373 let check = if s.passes { "[x]" } else { "[ ]" };
2374 format!("- {} {}: {}", check, s.id, s.title)
2375 })
2376 .collect();
2377
2378 format!(
2379 "{}\n\n**Project:** {}\n**Feature:** {}\n**Progress:** {}/{} stories\n**Iterations:** {}/{}\n\n## Stories\n{}",
2380 status,
2381 self.state.prd.project,
2382 self.state.prd.feature,
2383 self.state.prd.passed_count(),
2384 self.state.prd.user_stories.len(),
2385 self.state.current_iteration,
2386 self.state.max_iterations,
2387 stories.join("\n")
2388 )
2389 }
2390}
2391
2392pub fn create_prd_template(project: &str, feature: &str) -> Prd {
2394 Prd {
2395 project: project.to_string(),
2396 feature: feature.to_string(),
2397 branch_name: format!("feature/{}", feature.to_lowercase().replace(' ', "-")),
2398 version: "1.0".to_string(),
2399 user_stories: vec![UserStory {
2400 id: "US-001".to_string(),
2401 title: "First user story".to_string(),
2402 description: "Description of what needs to be implemented".to_string(),
2403 acceptance_criteria: vec!["Criterion 1".to_string(), "Criterion 2".to_string()],
2404 verification_steps: Vec::new(),
2405 passes: false,
2406 priority: 1,
2407 depends_on: Vec::new(),
2408 complexity: 3,
2409 }],
2410 technical_requirements: Vec::new(),
2411 quality_checks: QualityChecks {
2412 typecheck: Some("cargo check".to_string()),
2413 test: Some("cargo test".to_string()),
2414 lint: Some("cargo clippy".to_string()),
2415 build: Some("cargo build".to_string()),
2416 },
2417 created_at: chrono::Utc::now().to_rfc3339(),
2418 updated_at: chrono::Utc::now().to_rfc3339(),
2419 }
2420}