1use super::state_store::{RalphRunState, RalphStateStore, StoryResultEntry};
4use super::types::*;
5use crate::bus::AgentBus;
6use crate::bus::relay::{ProtocolRelayRuntime, RelayAgentProfile};
7use crate::provider::{ContentPart, Message, Provider, ProviderRegistry, Role};
8use crate::session::{Session, SessionEvent};
9use crate::swarm::{executor::AgentLoopExit, run_agent_loop};
10use crate::tool::ToolRegistry;
11use crate::tui::ralph_view::{RalphEvent, RalphStoryInfo, RalphStoryStatus};
12use crate::tui::swarm_view::SwarmEvent;
13use crate::worktree::WorktreeManager;
14use std::collections::HashMap;
15use std::path::PathBuf;
16use std::process::Command;
17use std::sync::Arc;
18use tokio::sync::mpsc;
19use tracing::{debug, info, warn};
20
21pub struct RalphLoop {
23 state: RalphState,
24 provider: Arc<dyn Provider>,
25 model: String,
26 config: RalphConfig,
27 event_tx: Option<mpsc::Sender<RalphEvent>>,
28 bus: Option<Arc<AgentBus>>,
29 registry: Option<Arc<ProviderRegistry>>,
30 store: Option<Arc<dyn RalphStateStore>>,
31 run_id: String,
32}
33
34impl RalphLoop {
35 pub async fn new(
37 prd_path: PathBuf,
38 provider: Arc<dyn Provider>,
39 model: String,
40 config: RalphConfig,
41 ) -> anyhow::Result<Self> {
42 let prd = Prd::load(&prd_path).await?;
43
44 let working_dir = if let Some(parent) = prd_path.parent() {
46 if parent.as_os_str().is_empty() {
47 std::env::current_dir()?
48 } else {
49 parent.to_path_buf()
50 }
51 } else {
52 std::env::current_dir()?
53 };
54
55 info!(
56 "Loaded PRD: {} - {} ({} stories)",
57 prd.project,
58 prd.feature,
59 prd.user_stories.len()
60 );
61
62 let state = RalphState {
63 prd,
64 current_iteration: 0,
65 max_iterations: config.max_iterations,
66 status: RalphStatus::Pending,
67 progress_log: Vec::new(),
68 prd_path: prd_path.clone(),
69 working_dir,
70 };
71
72 Ok(Self {
73 state,
74 provider,
75 model,
76 config,
77 event_tx: None,
78 bus: None,
79 registry: None,
80 store: None,
81 run_id: uuid::Uuid::new_v4().to_string(),
82 })
83 }
84
85 pub fn with_event_tx(mut self, tx: mpsc::Sender<RalphEvent>) -> Self {
87 self.event_tx = Some(tx);
88 self
89 }
90
91 pub fn with_bus(mut self, bus: Arc<AgentBus>) -> Self {
93 self.bus = Some(bus);
94 self
95 }
96
97 pub fn with_registry(mut self, registry: Arc<ProviderRegistry>) -> Self {
99 self.registry = Some(registry);
100 self
101 }
102
103 pub fn with_store(mut self, store: Arc<dyn RalphStateStore>) -> Self {
105 self.store = Some(store);
106 self
107 }
108
109 pub fn with_run_id(mut self, run_id: String) -> Self {
111 self.run_id = run_id;
112 self
113 }
114
115 fn store_fire_and_forget(
117 &self,
118 fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
119 ) {
120 tokio::spawn(async move {
121 if let Err(e) = fut.await {
122 warn!(error = %e, "State store update failed");
123 }
124 });
125 }
126
127 fn try_send_event(&self, event: RalphEvent) {
129 if let Some(ref tx) = self.event_tx {
130 let _ = tx.try_send(event);
131 }
132 }
133
134 fn bus_publish_story_result(
136 &self,
137 story: &UserStory,
138 iteration: usize,
139 learnings: &[String],
140 next_story_id: Option<&str>,
141 ) {
142 let Some(ref bus) = self.bus else { return };
143 let prd_id = &self.state.prd.project;
144 let handle = bus.handle(format!("ralph.{}", story.id));
145
146 handle.publish_ralph_learning(
148 prd_id,
149 &story.id,
150 iteration,
151 learnings.to_vec(),
152 serde_json::json!({
153 "story_title": story.title,
154 "passed": story.passes,
155 }),
156 );
157
158 if let Some(next_id) = next_story_id {
160 handle.publish_ralph_handoff(
161 prd_id,
162 &story.id,
163 next_id,
164 serde_json::json!({ "learnings": learnings }),
165 &format!(
166 "{} {} (iteration {})",
167 story.id,
168 if story.passes { "passed" } else { "failed" },
169 iteration
170 ),
171 );
172 }
173
174 handle.publish_ralph_progress(
176 prd_id,
177 self.state.prd.passed_count(),
178 self.state.prd.user_stories.len(),
179 iteration,
180 &format!("{:?}", self.state.status),
181 );
182 }
183
184 fn bus_collect_learnings(&self) -> Vec<String> {
186 let Some(ref bus) = self.bus else {
187 return Vec::new();
188 };
189 let prd_id = &self.state.prd.project;
190 let mut handle = bus.handle("ralph-collector");
191 let envelopes = handle.drain_ralph_learnings(prd_id);
192 let mut learnings = Vec::new();
193 for env in envelopes {
194 match env.message {
195 crate::bus::BusMessage::RalphLearning {
196 story_id,
197 iteration,
198 learnings: items,
199 ..
200 } => {
201 for l in items {
202 learnings.push(format!("[{story_id} iter {iteration}] {l}"));
203 }
204 }
205 crate::bus::BusMessage::RalphHandoff {
206 from_story,
207 progress_summary,
208 ..
209 } => {
210 learnings.push(format!("[handoff from {from_story}] {progress_summary}"));
211 }
212 _ => {}
213 }
214 }
215 learnings
216 }
217
218 fn create_swarm_event_bridge(
222 ralph_tx: &mpsc::Sender<RalphEvent>,
223 story_id: String,
224 ) -> (mpsc::Sender<SwarmEvent>, tokio::task::JoinHandle<()>) {
225 let (swarm_tx, mut swarm_rx) = mpsc::channel::<SwarmEvent>(100);
226 let ralph_tx = ralph_tx.clone();
227 let handle = tokio::spawn(async move {
228 while let Some(event) = swarm_rx.recv().await {
229 let ralph_event = match event {
230 SwarmEvent::AgentToolCall { tool_name, .. } => RalphEvent::StoryToolCall {
231 story_id: story_id.clone(),
232 tool_name,
233 },
234 SwarmEvent::AgentToolCallDetail { detail, .. } => {
235 RalphEvent::StoryToolCallDetail {
236 story_id: story_id.clone(),
237 detail,
238 }
239 }
240 SwarmEvent::AgentMessage { entry, .. } => RalphEvent::StoryMessage {
241 story_id: story_id.clone(),
242 entry,
243 },
244 SwarmEvent::AgentOutput { output, .. } => RalphEvent::StoryOutput {
245 story_id: story_id.clone(),
246 output,
247 },
248 SwarmEvent::AgentError { error, .. } => RalphEvent::StoryError {
249 story_id: story_id.clone(),
250 error,
251 },
252 _ => continue, };
254 if ralph_tx.send(ralph_event).await.is_err() {
255 break;
256 }
257 }
258 });
259 (swarm_tx, handle)
260 }
261
262 fn build_story_infos(prd: &Prd) -> Vec<RalphStoryInfo> {
264 prd.user_stories
265 .iter()
266 .map(|s| RalphStoryInfo {
267 id: s.id.clone(),
268 title: s.title.clone(),
269 status: if s.passes {
270 RalphStoryStatus::Passed
271 } else {
272 RalphStoryStatus::Pending
273 },
274 priority: s.priority,
275 depends_on: s.depends_on.clone(),
276 quality_checks: Vec::new(),
277 tool_call_history: Vec::new(),
278 messages: Vec::new(),
279 output: None,
280 error: None,
281 merge_summary: None,
282 steps: 0,
283 current_tool: None,
284 })
285 .collect()
286 }
287
288 pub async fn run(&mut self) -> anyhow::Result<RalphState> {
290 self.state.status = RalphStatus::Running;
291
292 if let Some(ref store) = self.store {
294 let initial_state = RalphRunState {
295 run_id: self.run_id.clone(),
296 okr_id: None,
297 prd: self.state.prd.clone(),
298 config: self.config.clone(),
299 status: RalphStatus::Running,
300 current_iteration: 0,
301 max_iterations: self.state.max_iterations,
302 progress_log: Vec::new(),
303 story_results: Vec::new(),
304 error: None,
305 created_at: chrono::Utc::now().to_rfc3339(),
306 started_at: Some(chrono::Utc::now().to_rfc3339()),
307 completed_at: None,
308 };
309 let s = store.clone();
310 self.store_fire_and_forget(async move { s.create_run(&initial_state).await });
311 }
312
313 self.try_send_event(RalphEvent::Started {
315 project: self.state.prd.project.clone(),
316 feature: self.state.prd.feature.clone(),
317 stories: Self::build_story_infos(&self.state.prd),
318 max_iterations: self.state.max_iterations,
319 });
320
321 if !self.state.prd.branch_name.is_empty() {
323 info!("Switching to branch: {}", self.state.prd.branch_name);
324 self.git_checkout(&self.state.prd.branch_name)?;
325 }
326
327 if self.config.parallel_enabled {
329 self.run_parallel().await?;
330 } else {
331 self.run_sequential().await?;
332 }
333
334 if self.state.status != RalphStatus::Completed {
338 let passed = self.state.prd.passed_count();
339 let total = self.state.prd.user_stories.len();
340
341 if self.state.prd.is_complete() {
342 self.state.status = RalphStatus::Completed;
343 } else if self.state.current_iteration >= self.state.max_iterations {
344 if self.state.current_iteration > 0 && passed == 0 && total > 0 {
345 self.state.status = RalphStatus::QualityFailed;
347 warn!(
348 iterations = self.state.current_iteration,
349 passed = passed,
350 total = total,
351 "Ralph failed: all stories failed quality checks"
352 );
353 } else {
354 self.state.status = RalphStatus::MaxIterations;
356 info!(
357 iterations = self.state.current_iteration,
358 passed = passed,
359 total = total,
360 "Ralph finished with partial progress"
361 );
362 }
363 } else if self.state.current_iteration > 0 {
364 if passed == 0 && total > 0 {
367 self.state.status = RalphStatus::QualityFailed;
368 warn!(
369 iterations = self.state.current_iteration,
370 passed = passed,
371 total = total,
372 "Ralph ended with no passing stories"
373 );
374 } else {
375 self.state.status = RalphStatus::MaxIterations;
376 info!(
377 iterations = self.state.current_iteration,
378 passed = passed,
379 total = total,
380 "Ralph ended before full completion"
381 );
382 }
383 } else {
384 self.state.status = RalphStatus::MaxIterations;
385 }
386 }
387
388 if self.config.worktree_enabled {
390 let mgr = WorktreeManager::new(&self.state.working_dir);
391 match mgr.cleanup_all().await {
392 Ok(count) if count > 0 => {
393 info!(cleaned = count, "Cleaned up orphaned worktrees/branches");
394 }
395 Ok(_) => {}
396 Err(e) => {
397 warn!(error = %e, "Failed to cleanup orphaned worktrees");
398 }
399 }
400 }
401
402 if let Some(ref store) = self.store {
404 let s = store.clone();
405 let rid = self.run_id.clone();
406 let status = self.state.status;
407 let prd = self.state.prd.clone();
408 self.store_fire_and_forget(async move {
409 s.update_prd(&rid, &prd).await?;
410 s.complete_run(&rid, status).await
411 });
412 }
413
414 info!(
415 "Ralph finished: {:?}, {}/{} stories passed",
416 self.state.status,
417 self.state.prd.passed_count(),
418 self.state.prd.user_stories.len()
419 );
420
421 self.try_send_event(RalphEvent::Complete {
423 status: format!("{:?}", self.state.status),
424 passed: self.state.prd.passed_count(),
425 total: self.state.prd.user_stories.len(),
426 });
427
428 Ok(self.state.clone())
429 }
430
431 async fn run_sequential(&mut self) -> anyhow::Result<()> {
433 while self.state.current_iteration < self.state.max_iterations {
434 self.state.current_iteration += 1;
435
436 if let Some(ref store) = self.store {
438 let s = store.clone();
439 let rid = self.run_id.clone();
440 let iter = self.state.current_iteration;
441 self.store_fire_and_forget(async move { s.update_iteration(&rid, iter).await });
442 }
443
444 info!(
445 "=== Ralph iteration {} of {} ===",
446 self.state.current_iteration, self.state.max_iterations
447 );
448
449 self.try_send_event(RalphEvent::IterationStarted {
451 iteration: self.state.current_iteration,
452 max_iterations: self.state.max_iterations,
453 });
454
455 if self.state.prd.is_complete() {
457 info!("All stories complete!");
458 self.state.status = RalphStatus::Completed;
459 break;
460 }
461
462 let story = match self.state.prd.next_story() {
464 Some(s) => s.clone(),
465 None => {
466 warn!("No available stories (dependencies not met)");
467 break;
468 }
469 };
470
471 info!("Working on story: {} - {}", story.id, story.title);
472
473 self.try_send_event(RalphEvent::StoryStarted {
475 story_id: story.id.clone(),
476 });
477
478 let bus_learnings = self.bus_collect_learnings();
480
481 let prompt = if bus_learnings.is_empty() {
483 self.build_prompt(&story)
484 } else {
485 let mut p = self.build_prompt(&story);
486 p.push_str("\n## Learnings from Previous Iterations:\n");
487 for l in &bus_learnings {
488 p.push_str(&format!("- {l}\n"));
489 }
490 p
491 };
492
493 match self.call_llm(&story.id, &prompt).await {
495 Ok(response) => {
496 let entry = ProgressEntry {
498 story_id: story.id.clone(),
499 iteration: self.state.current_iteration,
500 status: "completed".to_string(),
501 learnings: self.extract_learnings(&response),
502 files_changed: Vec::new(),
503 timestamp: chrono::Utc::now().to_rfc3339(),
504 };
505 self.append_progress(&entry, &response)?;
506 let entry_learnings = entry.learnings.clone();
507 self.state.progress_log.push(entry);
508
509 if self.config.quality_checks_enabled {
511 if self.run_quality_gates_with_events(&story.id).await? {
512 info!("Story {} passed quality checks!", story.id);
513 self.state.prd.mark_passed(&story.id);
514
515 if let Some(ref store) = self.store {
517 let s = store.clone();
518 let rid = self.run_id.clone();
519 let entry = StoryResultEntry {
520 story_id: story.id.clone(),
521 title: story.title.clone(),
522 passed: true,
523 iteration: self.state.current_iteration,
524 error: None,
525 };
526 self.store_fire_and_forget(async move {
527 s.record_story_result(&rid, &entry).await
528 });
529 }
530
531 self.try_send_event(RalphEvent::StoryComplete {
532 story_id: story.id.clone(),
533 passed: true,
534 });
535
536 let next = self.state.prd.next_story().map(|s| s.id.clone());
538 self.bus_publish_story_result(
539 &story,
540 self.state.current_iteration,
541 &entry_learnings,
542 next.as_deref(),
543 );
544
545 if self.config.auto_commit {
547 self.commit_story(&story)?;
548 }
549
550 self.state.prd.save(&self.state.prd_path).await?;
552 } else {
553 warn!("Story {} failed quality checks", story.id);
554
555 if let Some(ref store) = self.store {
557 let s = store.clone();
558 let rid = self.run_id.clone();
559 let entry = StoryResultEntry {
560 story_id: story.id.clone(),
561 title: story.title.clone(),
562 passed: false,
563 iteration: self.state.current_iteration,
564 error: Some("Quality checks failed".to_string()),
565 };
566 self.store_fire_and_forget(async move {
567 s.record_story_result(&rid, &entry).await
568 });
569 }
570
571 let next = self.state.prd.next_story().map(|s| s.id.clone());
573 self.bus_publish_story_result(
574 &story,
575 self.state.current_iteration,
576 &entry_learnings,
577 next.as_deref(),
578 );
579
580 self.try_send_event(RalphEvent::StoryComplete {
581 story_id: story.id.clone(),
582 passed: false,
583 });
584 }
585 } else {
586 self.state.prd.mark_passed(&story.id);
588 self.state.prd.save(&self.state.prd_path).await?;
589
590 if let Some(ref store) = self.store {
592 let s = store.clone();
593 let rid = self.run_id.clone();
594 let entry = StoryResultEntry {
595 story_id: story.id.clone(),
596 title: story.title.clone(),
597 passed: true,
598 iteration: self.state.current_iteration,
599 error: None,
600 };
601 self.store_fire_and_forget(async move {
602 s.record_story_result(&rid, &entry).await
603 });
604 }
605
606 self.try_send_event(RalphEvent::StoryComplete {
607 story_id: story.id.clone(),
608 passed: true,
609 });
610 }
611 }
612 Err(e) => {
613 warn!("LLM call failed: {}", e);
614 let entry = ProgressEntry {
615 story_id: story.id.clone(),
616 iteration: self.state.current_iteration,
617 status: format!("failed: {}", e),
618 learnings: Vec::new(),
619 files_changed: Vec::new(),
620 timestamp: chrono::Utc::now().to_rfc3339(),
621 };
622 self.state.progress_log.push(entry);
623
624 self.try_send_event(RalphEvent::StoryError {
625 story_id: story.id.clone(),
626 error: format!("{}", e),
627 });
628 }
629 }
630 }
631
632 Ok(())
633 }
634
635 async fn run_parallel(&mut self) -> anyhow::Result<()> {
637 let stages: Vec<Vec<UserStory>> = self
639 .state
640 .prd
641 .stages()
642 .into_iter()
643 .map(|stage| stage.into_iter().cloned().collect())
644 .collect();
645 let total_stages = stages.len();
646
647 info!(
648 "Parallel execution: {} stages, {} max concurrent stories",
649 total_stages, self.config.max_concurrent_stories
650 );
651
652 let worktree_mgr = if self.config.worktree_enabled {
654 let mgr = WorktreeManager::new(&self.state.working_dir);
655 info!("Worktree isolation enabled for parallel stories");
656 Some(Arc::new(mgr))
657 } else {
658 None
659 };
660
661 for (stage_idx, stage_stories) in stages.into_iter().enumerate() {
662 if self.state.prd.is_complete() {
663 info!("All stories complete!");
664 self.state.status = RalphStatus::Completed;
665 break;
666 }
667
668 if self.state.current_iteration >= self.state.max_iterations {
669 break;
670 }
671
672 let story_count = stage_stories.len();
673 info!(
674 "=== Stage {}/{}: {} stories in parallel ===",
675 stage_idx + 1,
676 total_stages,
677 story_count
678 );
679
680 let stories: Vec<UserStory> = stage_stories;
682
683 let semaphore = Arc::new(tokio::sync::Semaphore::new(
685 self.config.max_concurrent_stories,
686 ));
687 let provider = Arc::clone(&self.provider);
688 let model = self.model.clone();
689 let prd_info = (
690 self.state.prd.project.clone(),
691 self.state.prd.feature.clone(),
692 );
693 let working_dir = self.state.working_dir.clone();
694 let progress_path = self.config.progress_path.clone();
695
696 let mut handles = Vec::new();
697
698 let accumulated_learnings = self.bus_collect_learnings();
700
701 for story in stories {
702 let sem = Arc::clone(&semaphore);
703 let provider = Arc::clone(&provider);
704 let model = model.clone();
705 let prd_info = prd_info.clone();
706 let working_dir = working_dir.clone();
707 let worktree_mgr = worktree_mgr.clone();
708 let progress_path = progress_path.clone();
709 let ralph_tx = self.event_tx.clone();
710 let stage_learnings = accumulated_learnings.clone();
711 let bus = self.bus.clone();
712 let relay_enabled = self.config.relay_enabled;
713 let relay_max_agents = self.config.relay_max_agents;
714 let relay_max_rounds = self.config.relay_max_rounds;
715 let registry = self.registry.clone();
716 let max_steps_per_story = self.config.max_steps_per_story;
717
718 let handle: tokio::task::JoinHandle<(
719 crate::ralph::types::UserStory,
720 bool,
721 crate::ralph::types::ProgressEntry,
722 Option<crate::worktree::WorktreeInfo>,
723 Option<std::sync::Arc<crate::worktree::WorktreeManager>>,
724 )> = tokio::spawn(async move {
725 let _permit = sem.acquire().await.expect("semaphore closed");
726
727 let (story_working_dir, worktree_info) = if let Some(ref mgr) = worktree_mgr {
729 match mgr.create(&story.id.to_lowercase().replace("-", "_")).await {
730 Ok(wt) => {
731 if let Err(e) = mgr.inject_workspace_stub(&wt.path) {
733 warn!(
734 story_id = %story.id,
735 error = %e,
736 "Failed to inject workspace stub"
737 );
738 }
739 info!(
740 story_id = %story.id,
741 worktree_path = %wt.path.display(),
742 "Created worktree for story"
743 );
744 (wt.path.clone(), Some(wt))
745 }
746 Err(e) => {
747 warn!(
748 story_id = %story.id,
749 error = %e,
750 "Failed to create worktree, using main directory"
751 );
752 (working_dir.clone(), None)
753 }
754 }
755 } else {
756 (working_dir.clone(), None)
757 };
758
759 info!(
760 "Working on story: {} - {} (in {:?})",
761 story.id, story.title, story_working_dir
762 );
763
764 if let Some(ref tx) = ralph_tx {
766 let _ = tx
767 .send(RalphEvent::StoryStarted {
768 story_id: story.id.clone(),
769 })
770 .await;
771 }
772
773 let mut prompt =
775 Self::build_story_prompt(&story, &prd_info, &story_working_dir);
776 if !stage_learnings.is_empty() {
777 prompt.push_str("\n## Learnings from Previous Stages:\n");
778 for l in &stage_learnings {
779 prompt.push_str(&format!("- {l}\n"));
780 }
781 }
782
783 let (bridge_tx, _bridge_handle) = if !relay_enabled {
785 if let Some(ref tx) = ralph_tx {
786 let (btx, handle) =
787 Self::create_swarm_event_bridge(tx, story.id.clone());
788 (Some(btx), Some(handle))
789 } else {
790 (None, None)
791 }
792 } else {
793 (None, None)
794 };
795
796 let result = if relay_enabled {
798 if let Some(ref reg) = registry {
799 Self::call_relay_static(
800 reg,
801 &model,
802 &prompt,
803 &story_working_dir,
804 ralph_tx.clone(),
805 story.id.clone(),
806 bus.clone(),
807 relay_max_agents,
808 relay_max_rounds,
809 )
810 .await
811 } else {
812 warn!(
813 story_id = %story.id,
814 "Relay enabled but no registry available, using single agent"
815 );
816 Self::call_llm_static(
817 &provider,
818 &model,
819 &prompt,
820 &story_working_dir,
821 bridge_tx,
822 story.id.clone(),
823 bus.clone(),
824 max_steps_per_story,
825 )
826 .await
827 }
828 } else {
829 Self::call_llm_static(
830 &provider,
831 &model,
832 &prompt,
833 &story_working_dir,
834 bridge_tx,
835 story.id.clone(),
836 bus.clone(),
837 max_steps_per_story,
838 )
839 .await
840 };
841
842 let entry = match &result {
843 Ok(response) => {
844 let progress_file = story_working_dir.join(&progress_path);
846 let _ = std::fs::write(&progress_file, response);
847
848 ProgressEntry {
849 story_id: story.id.clone(),
850 iteration: 1,
851 status: "completed".to_string(),
852 learnings: Self::extract_learnings_static(response),
853 files_changed: Vec::new(),
854 timestamp: chrono::Utc::now().to_rfc3339(),
855 }
856 }
857 Err(e) => {
858 warn!("LLM call failed for story {}: {}", story.id, e);
859 ProgressEntry {
860 story_id: story.id.clone(),
861 iteration: 1,
862 status: format!("failed: {}", e),
863 learnings: Vec::new(),
864 files_changed: Vec::new(),
865 timestamp: chrono::Utc::now().to_rfc3339(),
866 }
867 }
868 };
869
870 (story, result.is_ok(), entry, worktree_info, worktree_mgr)
871 });
872
873 handles.push(handle);
874 }
875
876 for handle in handles {
878 match handle.await {
879 Ok((story, success, entry, worktree_info, worktree_mgr)) => {
880 self.state.current_iteration += 1;
881 self.state.progress_log.push(entry);
882
883 if success {
884 let check_dir = worktree_info
886 .as_ref()
887 .map(|wt| wt.path.clone())
888 .unwrap_or_else(|| self.state.working_dir.clone());
889
890 let quality_passed = if self.config.quality_checks_enabled {
891 self.run_quality_gates_in_dir_with_events(&check_dir, &story.id)
892 .await
893 .unwrap_or(false)
894 } else {
895 true
896 };
897
898 if quality_passed {
899 info!("Story {} passed quality checks!", story.id);
900
901 if let Some(ref wt) = worktree_info {
903 let _ = Self::commit_in_dir(&wt.path, &story);
904 }
905
906 if let (Some(wt), Some(mgr)) = (worktree_info.as_ref(), worktree_mgr.as_ref()) {
908 match mgr.merge(&wt.name).await {
909 Ok(merge_result) => {
910 if merge_result.success {
911 info!(
912 story_id = %story.id,
913 files_changed = merge_result.files_changed,
914 "Merged story changes successfully"
915 );
916 self.state.prd.mark_passed(&story.id);
917 self.try_send_event(RalphEvent::StoryMerge {
918 story_id: story.id.clone(),
919 success: true,
920 summary: merge_result.summary.clone(),
921 });
922 self.try_send_event(RalphEvent::StoryComplete {
923 story_id: story.id.clone(),
924 passed: true,
925 });
926
927 if let Some(ref store) = self.store {
929 let s = store.clone();
930 let rid = self.run_id.clone();
931 let entry = StoryResultEntry {
932 story_id: story.id.clone(),
933 title: story.title.clone(),
934 passed: true,
935 iteration: self.state.current_iteration,
936 error: None,
937 };
938 self.store_fire_and_forget(async move {
939 s.record_story_result(&rid, &entry).await
940 });
941 }
942
943 let _ = mgr.cleanup(&wt.name).await;
945 } else if !merge_result.conflicts.is_empty() {
946 info!(
948 story_id = %story.id,
949 num_conflicts = merge_result.conflicts.len(),
950 "Spawning conflict resolver sub-agent"
951 );
952
953 match Self::resolve_conflicts_static(
955 &provider,
956 &model,
957 &working_dir,
958 &story,
959 &merge_result.conflicts,
960 &merge_result.conflict_diffs,
961 self.bus.clone(),
962 )
963 .await
964 {
965 Ok(resolved) => {
966 if resolved {
967 let commit_msg = format!(
969 "Merge: resolved conflicts for {}",
970 story.id
971 );
972 match mgr
973 .complete_merge(&wt.name, &commit_msg).await
974 {
975 Ok(final_result) => {
976 if final_result.success {
977 info!(
978 story_id = %story.id,
979 "Merge completed after conflict resolution"
980 );
981 self.state
982 .prd
983 .mark_passed(&story.id);
984 } else {
985 warn!(
986 story_id = %story.id,
987 "Merge failed even after resolution"
988 );
989 let _ = mgr.abort_merge(&wt.name).await;
990 }
991 }
992 Err(e) => {
993 warn!(
994 story_id = %story.id,
995 error = %e,
996 "Failed to complete merge after resolution"
997 );
998 let _ = mgr.abort_merge(&wt.name).await;
999 }
1000 }
1001 } else {
1002 warn!(
1003 story_id = %story.id,
1004 "Conflict resolver could not resolve all conflicts"
1005 );
1006 let _ = mgr.abort_merge(&wt.name).await;
1007 }
1008 }
1009 Err(e) => {
1010 warn!(
1011 story_id = %story.id,
1012 error = %e,
1013 "Conflict resolver failed"
1014 );
1015 let _ = mgr.abort_merge(&wt.name).await;
1016 }
1017 }
1018 let _ = mgr.cleanup(&wt.name).await;
1020 } else if merge_result.aborted {
1021 warn!(
1023 story_id = %story.id,
1024 summary = %merge_result.summary,
1025 "Merge was aborted due to non-conflict failure"
1026 );
1027 let _ = mgr.cleanup(&wt.name).await;
1029 } else {
1030 warn!(
1032 story_id = %story.id,
1033 summary = %merge_result.summary,
1034 "Merge failed but not aborted - manual intervention may be needed"
1035 );
1036 }
1038 }
1039 Err(e) => {
1040 warn!(
1041 story_id = %story.id,
1042 error = %e,
1043 "Failed to merge worktree"
1044 );
1045 }
1046 }
1047 } else {
1048 self.state.prd.mark_passed(&story.id);
1050 self.try_send_event(RalphEvent::StoryComplete {
1051 story_id: story.id.clone(),
1052 passed: true,
1053 });
1054
1055 if let Some(ref store) = self.store {
1057 let s = store.clone();
1058 let rid = self.run_id.clone();
1059 let entry = StoryResultEntry {
1060 story_id: story.id.clone(),
1061 title: story.title.clone(),
1062 passed: true,
1063 iteration: self.state.current_iteration,
1064 error: None,
1065 };
1066 self.store_fire_and_forget(async move {
1067 s.record_story_result(&rid, &entry).await
1068 });
1069 }
1070 }
1071 } else {
1072 warn!("Story {} failed quality checks", story.id);
1073 self.try_send_event(RalphEvent::StoryComplete {
1074 story_id: story.id.clone(),
1075 passed: false,
1076 });
1077 if let (Some(wt), Some(mgr)) = (worktree_info.as_ref(), worktree_mgr.as_ref()) {
1079 let _ = mgr.cleanup(&wt.name).await;
1080 }
1081 }
1082 } else {
1083 self.try_send_event(RalphEvent::StoryError {
1085 story_id: story.id.clone(),
1086 error: "LLM call failed".to_string(),
1087 });
1088 if let Some(ref wt) = worktree_info {
1089 info!(
1090 story_id = %story.id,
1091 worktree_path = %wt.path.display(),
1092 "Keeping worktree for debugging (story failed)"
1093 );
1094 }
1095 }
1096 }
1097 Err(e) => {
1098 warn!("Story execution task failed: {}", e);
1099 }
1100 }
1101 }
1102
1103 if self.bus.is_some() {
1105 let bus_learnings: Vec<String> = self
1106 .state
1107 .progress_log
1108 .iter()
1109 .flat_map(|e| e.learnings.clone())
1110 .collect();
1111 for story in &self.state.prd.user_stories {
1112 if story.passes {
1113 self.bus_publish_story_result(
1114 story,
1115 self.state.current_iteration,
1116 &bus_learnings,
1117 None,
1118 );
1119 }
1120 }
1121 }
1122
1123 self.state.prd.save(&self.state.prd_path).await?;
1125
1126 if let Some(ref store) = self.store {
1128 let s = store.clone();
1129 let rid = self.run_id.clone();
1130 let prd = self.state.prd.clone();
1131 let iter = self.state.current_iteration;
1132 self.store_fire_and_forget(async move {
1133 s.update_prd(&rid, &prd).await?;
1134 s.update_iteration(&rid, iter).await
1135 });
1136 }
1137 }
1138
1139 Ok(())
1140 }
1141
1142 fn build_story_prompt(
1144 story: &UserStory,
1145 prd_info: &(String, String),
1146 working_dir: &PathBuf,
1147 ) -> String {
1148 let wd = working_dir.display();
1149 format!(
1150 r#"# PRD: {} - {}
1151
1152## Working Directory: {wd}
1153
1154## Current Story: {} - {}
1155
1156{}
1157
1158### Acceptance Criteria:
1159{}
1160
1161## WORKFLOW (follow this exactly):
1162
11631. **EXPLORE** (2-4 tool calls): Use `glob` and `read` to understand existing code
11642. **IMPLEMENT** (5-15 tool calls): Use `write` or `edit` to make changes
11653. **VERIFY**: Run a verification command appropriate to the language (see below)
11664. **FIX OR FINISH**:
1167 - If no errors: Output `STORY_COMPLETE: {}` and STOP
1168 - If errors: Parse the error, fix it, re-verify (max 3 fix attempts)
1169 - After 3 failed attempts: Output `STORY_BLOCKED: <error summary>` and STOP
1170
1171## VERIFICATION BY LANGUAGE:
1172- **Rust**: `bash` with `cargo check 2>&1`
1173- **Python**: `bash` with `python -c "import ast; ast.parse(open('FILE').read())"` for each changed file
1174- **TypeScript/JavaScript**: `bash` with `npx tsc --noEmit` (if tsconfig.json exists)
1175- Choose the right check for the files you modified
1176
1177## TOOL USAGE:
1178- `read`: Read file content (always read before editing!)
1179- `edit`: Modify files (MUST include 3+ lines before/after for unique context)
1180- `write`: Create new files
1181- `bash`: Run commands with `{{"command": "...", "cwd": "{wd}"}}`
1182
1183## CRITICAL RULES:
1184- ALWAYS read a file before editing it
1185- When edit fails with "ambiguous match", include MORE context lines
1186- Do NOT add TODO/placeholder comments
1187- Count your fix attempts - STOP after 3 failures
1188
1189## TERMINATION:
1190SUCCESS: Output `STORY_COMPLETE: {}`
1191BLOCKED: Output `STORY_BLOCKED: <brief error description>`
1192
1193Do NOT keep iterating indefinitely. Stop when done or blocked.
1194"#,
1195 prd_info.0,
1196 prd_info.1,
1197 story.id,
1198 story.title,
1199 story.description,
1200 story
1201 .acceptance_criteria
1202 .iter()
1203 .map(|c| format!("- {}", c))
1204 .collect::<Vec<_>>()
1205 .join("\n"),
1206 story.id,
1207 story.id
1208 )
1209 }
1210
1211 async fn call_llm_static(
1213 provider: &Arc<dyn Provider>,
1214 model: &str,
1215 prompt: &str,
1216 working_dir: &PathBuf,
1217 event_tx: Option<mpsc::Sender<SwarmEvent>>,
1218 story_id: String,
1219 bus: Option<Arc<AgentBus>>,
1220 max_steps_per_story: usize,
1221 ) -> anyhow::Result<String> {
1222 let system_prompt = crate::agent::builtin::build_system_prompt(working_dir);
1224
1225 let tool_registry =
1227 ToolRegistry::with_provider_arc(Arc::clone(provider), model.to_string());
1228
1229 let tool_definitions: Vec<_> = tool_registry
1231 .definitions()
1232 .into_iter()
1233 .filter(|t| t.name != "question")
1234 .collect();
1235
1236 info!(
1237 "Ralph sub-agent starting with {} tools in {:?}",
1238 tool_definitions.len(),
1239 working_dir
1240 );
1241
1242 let max_steps = max_steps_per_story;
1244 let (output, steps, tool_calls, _exit_reason) = run_agent_loop(
1245 Arc::clone(provider),
1246 model,
1247 &system_prompt,
1248 prompt,
1249 tool_definitions,
1250 tool_registry, max_steps,
1252 180, event_tx,
1254 story_id,
1255 bus.clone(),
1256 Some(working_dir.clone()),
1257 )
1258 .await?;
1259
1260 info!(
1261 "Ralph sub-agent completed: {} steps, {} tool calls",
1262 steps, tool_calls
1263 );
1264
1265 Ok(output)
1266 }
1267
1268 fn build_implementation_relay_profiles(
1270 max_agents: usize,
1271 working_dir: &std::path::Path,
1272 ) -> Vec<(String, String, Vec<String>)> {
1273 let wd = working_dir.display();
1274 let mut profiles = vec![
1275 (
1276 "auto-planner".to_string(),
1277 format!(
1278 "You are @auto-planner.\n\
1279 Specialty: Code analysis and implementation planning.\n\
1280 Mission: Read the codebase to understand existing patterns, then produce a concrete step-by-step implementation plan.\n\
1281 Working directory: {wd}\n\n\
1282 Read existing code first using the `read` tool. Identify all files that need changes.\n\
1283 Produce a numbered list of specific changes with file paths and code snippets.\n\
1284 Pass your plan in a clear format the next agent can follow step by step."
1285 ),
1286 vec!["planning".into(), "analysis".into(), "relay".into()],
1287 ),
1288 (
1289 "auto-coder".to_string(),
1290 format!(
1291 "You are @auto-coder.\n\
1292 Specialty: Code implementation.\n\
1293 Mission: Implement changes according to the plan, write production code, verify it compiles.\n\
1294 Working directory: {wd}\n\n\
1295 Follow the plan from the incoming handoff. Use `edit` and `write` tools to make changes.\n\
1296 After implementing, run `bash` with `cargo check 2>&1` (cwd: {wd}) to verify.\n\
1297 Fix any compilation errors. Report what you implemented."
1298 ),
1299 vec!["implementation".into(), "coding".into(), "relay".into()],
1300 ),
1301 (
1302 "auto-reviewer".to_string(),
1303 format!(
1304 "You are @auto-reviewer.\n\
1305 Specialty: Code review and quality verification.\n\
1306 Mission: Review implemented changes, run quality checks, fix remaining issues.\n\
1307 Working directory: {wd}\n\n\
1308 Review the code changes from the incoming handoff. Run `bash` with `cargo check 2>&1` (cwd: {wd}).\n\
1309 Fix any remaining issues. Verify the implementation meets the acceptance criteria.\n\
1310 If complete, include STORY_COMPLETE in your output. If blocked, include STORY_BLOCKED."
1311 ),
1312 vec!["review".into(), "verification".into(), "relay".into()],
1313 ),
1314 ];
1315
1316 if max_agents >= 4 {
1317 profiles.push((
1318 "auto-tester".to_string(),
1319 format!(
1320 "You are @auto-tester.\n\
1321 Specialty: Test writing and verification.\n\
1322 Mission: Write tests for the implemented changes and ensure they pass.\n\
1323 Working directory: {wd}\n\n\
1324 Review the implementation, write appropriate tests, run them with `bash`.\n\
1325 Report test results and coverage gaps."
1326 ),
1327 vec!["testing".into(), "relay".into()],
1328 ));
1329 }
1330
1331 if max_agents >= 5 {
1332 profiles.push((
1333 "auto-refactorer".to_string(),
1334 format!(
1335 "You are @auto-refactorer.\n\
1336 Specialty: Code quality and lint compliance.\n\
1337 Mission: Run clippy, fix warnings, improve code quality without changing behavior.\n\
1338 Working directory: {wd}\n\n\
1339 Run `bash` with `cargo clippy --all-features 2>&1` (cwd: {wd}).\n\
1340 Fix warnings and improve naming, structure, error handling."
1341 ),
1342 vec!["quality".into(), "relay".into()],
1343 ));
1344 }
1345
1346 profiles.truncate(max_agents);
1347 profiles
1348 }
1349
1350 fn normalize_for_relay(text: &str) -> String {
1352 let mut normalized = String::with_capacity(text.len().min(512));
1353 let mut last_was_space = false;
1354 for ch in text.chars() {
1355 if ch.is_ascii_alphanumeric() {
1356 normalized.push(ch.to_ascii_lowercase());
1357 last_was_space = false;
1358 } else if ch.is_whitespace() && !last_was_space {
1359 normalized.push(' ');
1360 last_was_space = true;
1361 }
1362 if normalized.len() >= 280 {
1363 break;
1364 }
1365 }
1366 normalized.trim().to_string()
1367 }
1368
1369 fn prepare_relay_handoff(from_agent: &str, output: &str, original_task: &str) -> String {
1371 let max_len = 6_000;
1372 let relay_payload = if output.len() > max_len {
1373 let truncated: String = output.chars().take(max_len).collect();
1374 format!("{truncated}\n... (truncated)")
1375 } else {
1376 output.to_string()
1377 };
1378
1379 format!(
1380 "Original task:\n{original_task}\n\n\
1381 Incoming handoff from @{from_agent}:\n{relay_payload}\n\n\
1382 Continue the work from this handoff. Keep your response focused and provide concrete next steps."
1383 )
1384 }
1385
1386 async fn call_relay_static(
1393 registry: &Arc<ProviderRegistry>,
1394 model: &str,
1395 prompt: &str,
1396 working_dir: &PathBuf,
1397 ralph_tx: Option<mpsc::Sender<RalphEvent>>,
1398 story_id: String,
1399 bus: Option<Arc<AgentBus>>,
1400 max_agents: usize,
1401 max_rounds: usize,
1402 ) -> anyhow::Result<String> {
1403 let max_agents = max_agents.clamp(2, 8);
1404 let max_rounds = max_rounds.clamp(1, 5);
1405
1406 let profiles = Self::build_implementation_relay_profiles(max_agents, working_dir);
1407
1408 let relay_bus = bus.unwrap_or_else(|| Arc::new(AgentBus::new()));
1410 let relay = ProtocolRelayRuntime::new(relay_bus.clone());
1411
1412 let mut sessions: HashMap<String, Session> = HashMap::new();
1413 let mut ordered_agents: Vec<String> = Vec::new();
1414 let mut relay_profiles: Vec<RelayAgentProfile> = Vec::new();
1415
1416 for (name, instructions, capabilities) in &profiles {
1417 let mut session = Session::new().await?;
1418 session.metadata.model = Some(model.to_string());
1419 session.agent = name.clone();
1420 session.bus = Some(relay_bus.clone());
1421 session.add_message(Message {
1422 role: Role::System,
1423 content: vec![ContentPart::Text {
1424 text: instructions.clone(),
1425 }],
1426 });
1427
1428 relay_profiles.push(RelayAgentProfile {
1429 name: name.clone(),
1430 capabilities: capabilities.clone(),
1431 });
1432 ordered_agents.push(name.clone());
1433 sessions.insert(name.clone(), session);
1434 }
1435
1436 relay.register_agents(&relay_profiles);
1437
1438 info!(
1439 story_id = %story_id,
1440 agents = ordered_agents.len(),
1441 rounds = max_rounds,
1442 "Starting relay team for story"
1443 );
1444
1445 let mut baton = prompt.to_string();
1446 let mut previous_normalized: Option<String> = None;
1447 let mut convergence_hits = 0usize;
1448 let mut turns = 0usize;
1449
1450 'relay_loop: for round in 1..=max_rounds {
1451 for idx in 0..ordered_agents.len() {
1452 let to = ordered_agents[idx].clone();
1453 let from = if idx == 0 {
1454 if round == 1 {
1455 "user".to_string()
1456 } else {
1457 ordered_agents[ordered_agents.len() - 1].clone()
1458 }
1459 } else {
1460 ordered_agents[idx - 1].clone()
1461 };
1462
1463 turns += 1;
1464 relay.send_handoff(&from, &to, &baton);
1465
1466 if let Some(ref tx) = ralph_tx {
1467 let _ = tx
1468 .send(RalphEvent::StoryToolCall {
1469 story_id: story_id.clone(),
1470 tool_name: format!(
1471 "relay: @{from} → @{to} (round {round}/{max_rounds})"
1472 ),
1473 })
1474 .await;
1475 }
1476
1477 let Some(mut session) = sessions.remove(&to) else {
1478 anyhow::bail!("Relay agent @{to} session unavailable for story {story_id}");
1479 };
1480
1481 let (event_tx, mut event_rx) = mpsc::channel::<SessionEvent>(256);
1482 let registry_clone = registry.clone();
1483 let baton_clone = baton.clone();
1484
1485 let join = tokio::spawn(async move {
1486 let result = session
1487 .prompt_with_events(&baton_clone, event_tx, registry_clone)
1488 .await;
1489 (session, result)
1490 });
1491
1492 while !join.is_finished() {
1494 while let Ok(event) = event_rx.try_recv() {
1495 if let Some(ref tx) = ralph_tx {
1496 if let SessionEvent::ToolCallStart { ref name, .. } = event {
1497 let _ = tx
1498 .send(RalphEvent::StoryToolCall {
1499 story_id: story_id.clone(),
1500 tool_name: format!("@{to}: {name}"),
1501 })
1502 .await;
1503 }
1504 }
1505 }
1506 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1507 }
1508
1509 let (updated_session, result) = join
1510 .await
1511 .map_err(|e| anyhow::anyhow!("Relay agent @{to} task error: {e}"))?;
1512
1513 while event_rx.try_recv().is_ok() {}
1515
1516 sessions.insert(to.clone(), updated_session);
1517
1518 let output = result
1519 .map_err(|e| {
1520 anyhow::anyhow!("Relay agent @{to} failed on story {story_id}: {e}")
1521 })?
1522 .text;
1523
1524 let normalized = Self::normalize_for_relay(&output);
1526 if previous_normalized.as_deref() == Some(normalized.as_str()) {
1527 convergence_hits += 1;
1528 } else {
1529 convergence_hits = 0;
1530 }
1531 previous_normalized = Some(normalized);
1532
1533 if convergence_hits >= 2 {
1534 info!(story_id = %story_id, turns, "Relay converged");
1535 baton = output;
1536 break 'relay_loop;
1537 }
1538
1539 if output.contains("STORY_COMPLETE") || output.contains("STORY_BLOCKED") {
1541 info!(story_id = %story_id, turns, "Story reached terminal state via relay");
1542 baton = output;
1543 break 'relay_loop;
1544 }
1545
1546 baton = Self::prepare_relay_handoff(&to, &output, prompt);
1548 }
1549 }
1550
1551 relay.shutdown_agents(&ordered_agents);
1552
1553 info!(
1554 story_id = %story_id,
1555 turns,
1556 convergence_hits,
1557 "Relay team completed"
1558 );
1559
1560 Ok(baton)
1561 }
1562
1563 async fn resolve_conflicts_static(
1565 provider: &Arc<dyn Provider>,
1566 model: &str,
1567 working_dir: &PathBuf,
1568 story: &UserStory,
1569 conflicts: &[String],
1570 conflict_diffs: &[(String, String)],
1571 bus: Option<Arc<AgentBus>>,
1572 ) -> anyhow::Result<bool> {
1573 info!(
1574 story_id = %story.id,
1575 num_conflicts = conflicts.len(),
1576 "Starting conflict resolution sub-agent"
1577 );
1578
1579 let conflict_info = conflict_diffs
1581 .iter()
1582 .map(|(file, diff)| format!("### File: {}\n```diff\n{}\n```", file, diff))
1583 .collect::<Vec<_>>()
1584 .join("\n\n");
1585
1586 let prompt = format!(
1587 r#"# CONFLICT RESOLUTION TASK
1588
1589## Story Context: {} - {}
1590{}
1591
1592## Conflicting Files
1593The following files have merge conflicts that need resolution:
1594{}
1595
1596## Conflict Details
1597{}
1598
1599## Your Task
16001. Read each conflicting file to see the conflict markers
16012. Understand what BOTH sides are trying to do:
1602 - HEAD (main branch): the current state
1603 - The incoming branch: the sub-agent's changes for story {}
16043. Resolve each conflict by:
1605 - Keeping BOTH changes if they don't actually conflict
1606 - Merging the logic if they touch the same code
1607 - Preferring the sub-agent's changes if they implement the story requirement
16084. Remove ALL conflict markers (<<<<<<<, =======, >>>>>>>)
16095. Ensure the final code compiles: run `cargo check`
1610
1611## CRITICAL RULES
1612- Do NOT leave any conflict markers in files
1613- Do NOT just pick one side - understand and merge the intent
1614- MUST run `cargo check` after resolving to verify
1615- Stage resolved files with `git add <file>`
1616
1617## Termination
1618SUCCESS: Output `CONFLICTS_RESOLVED` when all files are resolved and compile
1619FAILED: Output `CONFLICTS_UNRESOLVED: <reason>` if you cannot resolve
1620
1621Working directory: {}
1622"#,
1623 story.id,
1624 story.title,
1625 story.description,
1626 conflicts
1627 .iter()
1628 .map(|f| format!("- {}", f))
1629 .collect::<Vec<_>>()
1630 .join("\n"),
1631 conflict_info,
1632 story.id,
1633 working_dir.display()
1634 );
1635
1636 let system_prompt = crate::agent::builtin::build_system_prompt(working_dir);
1638
1639 let tool_registry =
1641 ToolRegistry::with_provider_arc(Arc::clone(provider), model.to_string());
1642
1643 let tool_definitions: Vec<_> = tool_registry
1644 .definitions()
1645 .into_iter()
1646 .filter(|t| t.name != "question")
1647 .collect();
1648
1649 info!(
1650 "Conflict resolver starting with {} tools",
1651 tool_definitions.len()
1652 );
1653
1654 let (output, steps, tool_calls, _exit_reason) = run_agent_loop(
1656 Arc::clone(provider),
1657 model,
1658 &system_prompt,
1659 &prompt,
1660 tool_definitions,
1661 tool_registry,
1662 15, 120, None,
1665 String::new(),
1666 bus.clone(),
1667 Some(working_dir.to_path_buf()),
1668 )
1669 .await?;
1670
1671 info!(
1672 story_id = %story.id,
1673 steps = steps,
1674 tool_calls = tool_calls,
1675 "Conflict resolver completed"
1676 );
1677
1678 let resolved = output.contains("CONFLICTS_RESOLVED")
1680 || (output.contains("resolved") && !output.contains("UNRESOLVED"));
1681
1682 if resolved {
1683 info!(story_id = %story.id, "Conflicts resolved successfully");
1684 } else {
1685 warn!(
1686 story_id = %story.id,
1687 output = %output.chars().take(200).collect::<String>(),
1688 "Conflict resolution may have failed"
1689 );
1690 }
1691
1692 Ok(resolved)
1693 }
1694
1695 fn extract_learnings_static(response: &str) -> Vec<String> {
1697 response
1698 .lines()
1699 .filter(|line| {
1700 line.contains("learned") || line.contains("Learning") || line.contains("# What")
1701 })
1702 .map(|line| line.trim().to_string())
1703 .collect()
1704 }
1705
1706 fn commit_in_dir(dir: &PathBuf, story: &UserStory) -> anyhow::Result<()> {
1708 let _ = Command::new("git")
1710 .args(["add", "-A"])
1711 .current_dir(dir)
1712 .output();
1713
1714 let msg = format!("feat({}): {}", story.id.to_lowercase(), story.title);
1716 let _ = Command::new("git")
1717 .args(["commit", "-m", &msg])
1718 .current_dir(dir)
1719 .output();
1720
1721 Ok(())
1722 }
1723
1724 fn find_cargo_root(dir: &PathBuf) -> PathBuf {
1729 fn has_cargo_toml(path: &PathBuf) -> bool {
1730 path.join("Cargo.toml").exists()
1731 }
1732
1733 if has_cargo_toml(dir) {
1735 return dir.clone();
1736 }
1737
1738 if let Ok(entries) = std::fs::read_dir(dir) {
1740 let mut subdirs: Vec<_> = entries
1741 .filter_map(|e| e.ok())
1742 .filter(|e| e.path().is_dir())
1743 .map(|e| e.path())
1744 .collect();
1745
1746 subdirs.sort();
1748
1749 for subdir in subdirs {
1750 if has_cargo_toml(&subdir) {
1751 return subdir;
1752 }
1753
1754 if let Ok(sub_entries) = std::fs::read_dir(&subdir) {
1756 let mut sub_subdirs: Vec<_> = sub_entries
1757 .filter_map(|e| e.ok())
1758 .filter(|e| e.path().is_dir())
1759 .map(|e| e.path())
1760 .collect();
1761
1762 sub_subdirs.sort();
1763
1764 for sub_subdir in sub_subdirs {
1765 if has_cargo_toml(&sub_subdir) {
1766 return sub_subdir;
1767 }
1768 }
1769 }
1770 }
1771 }
1772
1773 warn!(
1775 dir = %dir.display(),
1776 "No Cargo.toml found, using worktree root for cargo commands"
1777 );
1778 dir.clone()
1779 }
1780
1781 async fn run_quality_gates_in_dir_with_events(
1783 &self,
1784 dir: &PathBuf,
1785 story_id: &str,
1786 ) -> anyhow::Result<bool> {
1787 let cargo_root = Self::find_cargo_root(dir);
1789 debug!(
1790 worktree_dir = %dir.display(),
1791 cargo_root = %cargo_root.display(),
1792 "Running quality gates"
1793 );
1794
1795 let checks = &self.state.prd.quality_checks;
1796 let mut all_passed = true;
1797
1798 for (name, cmd) in [
1799 ("typecheck", &checks.typecheck),
1800 ("lint", &checks.lint),
1801 ("test", &checks.test),
1802 ("build", &checks.build),
1803 ] {
1804 if let Some(command) = cmd {
1805 debug!("Running {} check in {:?}: {}", name, cargo_root, command);
1806 let output = Command::new("/bin/sh")
1807 .arg("-c")
1808 .arg(command)
1809 .current_dir(&cargo_root)
1810 .output()
1811 .map_err(|e| {
1812 anyhow::anyhow!("Failed to run quality check '{}': {}", name, e)
1813 })?;
1814
1815 let passed = output.status.success();
1816 self.try_send_event(RalphEvent::StoryQualityCheck {
1817 story_id: story_id.to_string(),
1818 check_name: name.to_string(),
1819 passed,
1820 });
1821
1822 if !passed {
1823 let stderr = String::from_utf8_lossy(&output.stderr);
1824 let stdout = String::from_utf8_lossy(&output.stdout);
1825 let combined = format!("{}\n{}", stdout, stderr);
1826 let error_summary: String = combined
1827 .lines()
1828 .filter(|line| {
1829 line.starts_with("error")
1830 || line.contains("error:")
1831 || line.contains("error[")
1832 })
1833 .take(5)
1834 .collect::<Vec<_>>()
1835 .join("\n");
1836 warn!(
1837 check = %name,
1838 cargo_root = %cargo_root.display(),
1839 error_summary = %error_summary.chars().take(300).collect::<String>(),
1840 "{} check failed in {:?}",
1841 name,
1842 cargo_root
1843 );
1844 all_passed = false;
1845 break; }
1847 }
1848 }
1849
1850 Ok(all_passed)
1851 }
1852
1853 fn build_prompt(&self, story: &UserStory) -> String {
1855 let progress = self.load_progress().unwrap_or_default();
1856
1857 format!(
1858 r#"# PRD: {} - {}
1859
1860## Current Story: {} - {}
1861
1862{}
1863
1864### Acceptance Criteria:
1865{}
1866
1867## Previous Progress:
1868{}
1869
1870## Instructions:
18711. Implement the requirements for this story
18722. Write any necessary code changes
18733. Document what you learned
18744. End with `STORY_COMPLETE: {}` when done
1875
1876Respond with the implementation and any shell commands needed.
1877"#,
1878 self.state.prd.project,
1879 self.state.prd.feature,
1880 story.id,
1881 story.title,
1882 story.description,
1883 story
1884 .acceptance_criteria
1885 .iter()
1886 .map(|c| format!("- {}", c))
1887 .collect::<Vec<_>>()
1888 .join("\n"),
1889 if progress.is_empty() {
1890 "None yet".to_string()
1891 } else {
1892 progress
1893 },
1894 story.id
1895 )
1896 }
1897
1898 async fn call_llm(&self, story_id: &str, prompt: &str) -> anyhow::Result<String> {
1900 let system_prompt = crate::agent::builtin::build_system_prompt(&self.state.working_dir);
1902
1903 let tool_registry =
1905 ToolRegistry::with_provider_arc(Arc::clone(&self.provider), self.model.clone());
1906
1907 let tool_definitions: Vec<_> = tool_registry
1909 .definitions()
1910 .into_iter()
1911 .filter(|t| t.name != "question")
1912 .collect();
1913
1914 info!(
1915 "Ralph agent starting with {} tools in {:?}",
1916 tool_definitions.len(),
1917 self.state.working_dir
1918 );
1919
1920 let (bridge_tx, _bridge_handle) = if let Some(ref ralph_tx) = self.event_tx {
1922 let (tx, handle) = Self::create_swarm_event_bridge(ralph_tx, story_id.to_string());
1923 (Some(tx), Some(handle))
1924 } else {
1925 (None, None)
1926 };
1927
1928 let (output, steps, tool_calls, exit_reason) = run_agent_loop(
1930 Arc::clone(&self.provider),
1931 &self.model,
1932 &system_prompt,
1933 prompt,
1934 tool_definitions,
1935 tool_registry, 30, 180, bridge_tx,
1939 story_id.to_string(),
1940 self.bus.clone(),
1941 Some(self.state.working_dir.clone()),
1942 )
1943 .await?;
1944
1945 match exit_reason {
1947 AgentLoopExit::Completed => {
1948 info!(
1949 "Ralph agent completed: {} steps, {} tool calls",
1950 steps, tool_calls
1951 );
1952 }
1953 AgentLoopExit::MaxStepsReached => {
1954 warn!(
1955 story_id = %story_id,
1956 steps = steps,
1957 tool_calls = tool_calls,
1958 "Ralph sub-agent hit max steps limit - work may be incomplete"
1959 );
1960 }
1961 AgentLoopExit::TimedOut => {
1962 warn!(
1963 story_id = %story_id,
1964 steps = steps,
1965 tool_calls = tool_calls,
1966 "Ralph sub-agent timed out - work may be incomplete"
1967 );
1968 }
1969 }
1970
1971 Ok(output)
1972 }
1973
1974 async fn run_quality_gates_with_events(&self, story_id: &str) -> anyhow::Result<bool> {
1976 let cargo_root = Self::find_cargo_root(&self.state.working_dir);
1978 debug!(
1979 working_dir = %self.state.working_dir.display(),
1980 cargo_root = %cargo_root.display(),
1981 "Running quality gates"
1982 );
1983
1984 let checks = &self.state.prd.quality_checks;
1985 let mut all_passed = true;
1986
1987 for (name, cmd) in [
1988 ("typecheck", &checks.typecheck),
1989 ("lint", &checks.lint),
1990 ("test", &checks.test),
1991 ("build", &checks.build),
1992 ] {
1993 if let Some(command) = cmd {
1994 debug!("Running {} check in {:?}: {}", name, cargo_root, command);
1995 let output = Command::new("/bin/sh")
1996 .arg("-c")
1997 .arg(command)
1998 .current_dir(&cargo_root)
1999 .output()
2000 .map_err(|e| {
2001 anyhow::anyhow!("Failed to run quality check '{}': {}", name, e)
2002 })?;
2003
2004 let passed = output.status.success();
2005 self.try_send_event(RalphEvent::StoryQualityCheck {
2006 story_id: story_id.to_string(),
2007 check_name: name.to_string(),
2008 passed,
2009 });
2010
2011 if !passed {
2012 let stderr = String::from_utf8_lossy(&output.stderr);
2013 let stdout = String::from_utf8_lossy(&output.stdout);
2014 let combined = format!("{}\n{}", stdout, stderr);
2015 let error_summary: String = combined
2016 .lines()
2017 .filter(|line| {
2018 line.starts_with("error")
2019 || line.contains("error:")
2020 || line.contains("error[")
2021 })
2022 .take(5)
2023 .collect::<Vec<_>>()
2024 .join("\n");
2025 warn!(
2026 check = %name,
2027 cargo_root = %cargo_root.display(),
2028 error_summary = %error_summary.chars().take(300).collect::<String>(),
2029 "{} check failed in {:?}",
2030 name, cargo_root
2031 );
2032 all_passed = false;
2033 break; }
2035 }
2036 }
2037
2038 Ok(all_passed)
2039 }
2040
2041 fn commit_story(&self, story: &UserStory) -> anyhow::Result<()> {
2043 info!("Committing changes for story: {}", story.id);
2044
2045 let _ = Command::new("git")
2047 .args(["add", "-A"])
2048 .current_dir(&self.state.working_dir)
2049 .output();
2050
2051 let msg = format!("feat({}): {}", story.id.to_lowercase(), story.title);
2053 match Command::new("git")
2054 .args(["commit", "-m", &msg])
2055 .current_dir(&self.state.working_dir)
2056 .output()
2057 {
2058 Ok(output) if output.status.success() => {
2059 info!("Committed: {}", msg);
2060 }
2061 Ok(output) => {
2062 warn!(
2063 "Git commit had no changes or failed: {}",
2064 String::from_utf8_lossy(&output.stderr)
2065 );
2066 }
2067 Err(e) => {
2068 warn!("Could not run git commit: {}", e);
2069 }
2070 }
2071
2072 Ok(())
2073 }
2074
2075 fn git_checkout(&self, branch: &str) -> anyhow::Result<()> {
2077 let output = Command::new("git")
2079 .args(["checkout", branch])
2080 .current_dir(&self.state.working_dir)
2081 .output()?;
2082
2083 if !output.status.success() {
2084 Command::new("git")
2085 .args(["checkout", "-b", branch])
2086 .current_dir(&self.state.working_dir)
2087 .output()?;
2088 }
2089
2090 Ok(())
2091 }
2092
2093 fn load_progress(&self) -> anyhow::Result<String> {
2095 let path = self.state.working_dir.join(&self.config.progress_path);
2096 Ok(std::fs::read_to_string(path).unwrap_or_default())
2097 }
2098
2099 fn append_progress(&self, entry: &ProgressEntry, response: &str) -> anyhow::Result<()> {
2101 let path = self.state.working_dir.join(&self.config.progress_path);
2102 let mut content = self.load_progress().unwrap_or_default();
2103
2104 content.push_str(&format!(
2105 "\n---\n\n## Iteration {} - {} ({})\n\n**Status:** {}\n\n### Summary\n{}\n",
2106 entry.iteration, entry.story_id, entry.timestamp, entry.status, response
2107 ));
2108
2109 std::fs::write(path, content)?;
2110 Ok(())
2111 }
2112
2113 fn extract_learnings(&self, response: &str) -> Vec<String> {
2115 let mut learnings = Vec::new();
2116
2117 for line in response.lines() {
2118 if line.contains("learned") || line.contains("Learning") || line.contains("# What") {
2119 learnings.push(line.trim().to_string());
2120 }
2121 }
2122
2123 learnings
2124 }
2125
2126 pub fn status(&self) -> &RalphState {
2128 &self.state
2129 }
2130
2131 pub fn status_markdown(&self) -> String {
2133 let status = if self.state.prd.is_complete() {
2134 "# Ralph Complete!"
2135 } else {
2136 "# Ralph Status"
2137 };
2138
2139 let stories: Vec<String> = self
2140 .state
2141 .prd
2142 .user_stories
2143 .iter()
2144 .map(|s| {
2145 let check = if s.passes { "[x]" } else { "[ ]" };
2146 format!("- {} {}: {}", check, s.id, s.title)
2147 })
2148 .collect();
2149
2150 format!(
2151 "{}\n\n**Project:** {}\n**Feature:** {}\n**Progress:** {}/{} stories\n**Iterations:** {}/{}\n\n## Stories\n{}",
2152 status,
2153 self.state.prd.project,
2154 self.state.prd.feature,
2155 self.state.prd.passed_count(),
2156 self.state.prd.user_stories.len(),
2157 self.state.current_iteration,
2158 self.state.max_iterations,
2159 stories.join("\n")
2160 )
2161 }
2162}
2163
2164pub fn create_prd_template(project: &str, feature: &str) -> Prd {
2166 Prd {
2167 project: project.to_string(),
2168 feature: feature.to_string(),
2169 branch_name: format!("feature/{}", feature.to_lowercase().replace(' ', "-")),
2170 version: "1.0".to_string(),
2171 user_stories: vec![UserStory {
2172 id: "US-001".to_string(),
2173 title: "First user story".to_string(),
2174 description: "Description of what needs to be implemented".to_string(),
2175 acceptance_criteria: vec!["Criterion 1".to_string(), "Criterion 2".to_string()],
2176 verification_steps: Vec::new(),
2177 passes: false,
2178 priority: 1,
2179 depends_on: Vec::new(),
2180 complexity: 3,
2181 }],
2182 technical_requirements: Vec::new(),
2183 quality_checks: QualityChecks {
2184 typecheck: Some("cargo check".to_string()),
2185 test: Some("cargo test".to_string()),
2186 lint: Some("cargo clippy".to_string()),
2187 build: Some("cargo build".to_string()),
2188 },
2189 created_at: chrono::Utc::now().to_rfc3339(),
2190 updated_at: chrono::Utc::now().to_rfc3339(),
2191 }
2192}