1use crate::error::{CliError, Result};
4use colored::Colorize;
5use miyabi_agents::base::BaseAgent;
6use miyabi_agents::CoordinatorAgentWithLLM;
7use miyabi_types::{AgentConfig, AgentType, Task};
8use miyabi_worktree::{PoolConfig, WorktreePool, WorktreeTask};
9use std::collections::HashMap;
10
11pub struct ParallelCommand {
12 pub issues: Vec<u64>,
13 pub concurrency: usize,
14}
15
16impl ParallelCommand {
17 pub fn new(issues: Vec<u64>, concurrency: usize) -> Self {
18 Self {
19 issues,
20 concurrency,
21 }
22 }
23
24 pub async fn execute(&self) -> Result<()> {
25 println!(
26 "{}",
27 format!(
28 "🚀 Starting parallel execution of {} issues with concurrency {}...",
29 self.issues.len(),
30 self.concurrency
31 )
32 .cyan()
33 .bold()
34 );
35 println!();
36
37 if self.issues.is_empty() {
39 return Err(CliError::InvalidInput("No issues provided".to_string()));
40 }
41
42 if self.concurrency == 0 {
43 return Err(CliError::InvalidInput(
44 "Concurrency must be at least 1".to_string(),
45 ));
46 }
47
48 let config = self.load_config()?;
50
51 let pool_config = PoolConfig {
53 max_concurrency: self.concurrency,
54 timeout_seconds: 3600, fail_fast: false,
56 auto_cleanup: true,
57 };
58
59 println!(" Pool Configuration:");
60 println!(" Max concurrency: {}", pool_config.max_concurrency);
61 println!(" Timeout: {}s per task", pool_config.timeout_seconds);
62 println!(" Auto cleanup: {}", pool_config.auto_cleanup);
63 println!();
64
65 let pool = WorktreePool::new(pool_config).map_err(|e| {
66 CliError::ExecutionError(format!("Failed to create worktree pool: {}", e))
67 })?;
68
69 let tasks: Vec<WorktreeTask> = self
71 .issues
72 .iter()
73 .map(|&issue_number| WorktreeTask {
74 issue_number,
75 description: format!("Process Issue #{}", issue_number),
76 agent_type: Some("CoordinatorAgent".to_string()),
77 metadata: None,
78 })
79 .collect();
80
81 println!(" Tasks:");
82 for task in &tasks {
83 println!(
84 " {} Issue #{}: {}",
85 "•".dimmed(),
86 task.issue_number,
87 task.description.dimmed()
88 );
89 }
90 println!();
91
92 println!("{}", " Executing in parallel...".dimmed());
94 println!();
95
96 let start_time = std::time::Instant::now();
97
98 let result = pool
99 .execute_parallel(tasks, move |worktree_info, task| {
100 let config = config.clone();
101
102 async move {
103 println!(
104 " {} [Issue #{}] Starting in worktree {:?}",
105 "▶".green(),
106 task.issue_number,
107 worktree_info.path.file_name().unwrap_or_default()
108 );
109
110 let agent = CoordinatorAgentWithLLM::new(config);
112
113 let agent_task = Task {
115 id: format!("coordinator-issue-{}", task.issue_number),
116 title: format!("Coordinate Issue #{}", task.issue_number),
117 description: format!(
118 "Decompose Issue #{} into executable tasks",
119 task.issue_number
120 ),
121 task_type: miyabi_types::task::TaskType::Feature,
122 priority: 1,
123 severity: None,
124 impact: None,
125 assigned_agent: Some(AgentType::CoordinatorAgent),
126 dependencies: vec![],
127 estimated_duration: Some(5),
128 status: None,
129 start_time: None,
130 end_time: None,
131 metadata: Some(HashMap::from([(
132 "issue_number".to_string(),
133 serde_json::json!(task.issue_number),
134 )])),
135 };
136
137 let agent_result = agent
139 .execute(&agent_task)
140 .await
141 .map_err(|e| miyabi_types::error::MiyabiError::Unknown(e.to_string()))?;
142
143 println!(
144 " {} [Issue #{}] Completed with status: {:?}",
145 "✓".green().bold(),
146 task.issue_number,
147 agent_result.status
148 );
149
150 Ok(agent_result.data.unwrap_or_else(|| {
152 serde_json::json!({
153 "status": "completed",
154 "issue": task.issue_number
155 })
156 }))
157 }
158 })
159 .await;
160
161 let elapsed = start_time.elapsed();
162
163 println!();
164 println!("{}", "═".repeat(80).dimmed());
165 println!();
166 println!("{}", "📊 Execution Results".cyan().bold());
167 println!();
168 println!(" Summary:");
169 println!(" Total tasks: {}", result.total_tasks);
170 println!(
171 " {} Successful: {}",
172 "✓".green().bold(),
173 result.success_count
174 );
175 if result.failed_count > 0 {
176 println!(" {} Failed: {}", "✗".red().bold(), result.failed_count);
177 }
178 if result.timeout_count > 0 {
179 println!(
180 " {} Timeout: {}",
181 "⏱".yellow().bold(),
182 result.timeout_count
183 );
184 }
185 println!();
186
187 println!(" Performance:");
188 println!(" Wall time: {:.2}s", elapsed.as_secs_f64());
189 println!(
190 " Total duration: {:.2}s",
191 result.total_duration_ms as f64 / 1000.0
192 );
193 println!(" Success rate: {:.1}%", result.success_rate());
194 println!(
195 " Average task duration: {:.2}s",
196 result.average_duration_ms() / 1000.0
197 );
198
199 let sequential_time = result.average_duration_ms() * result.total_tasks as f64;
201 let parallel_time = result.total_duration_ms as f64;
202 let speedup = sequential_time / parallel_time;
203
204 println!(" Estimated speedup: {:.2}x", speedup);
205 println!();
206
207 if result.results.len() <= 10 {
209 println!(" Individual Results:");
210 for task_result in &result.results {
211 let status_icon = match task_result.status {
212 miyabi_worktree::TaskStatus::Success => "✓".green(),
213 miyabi_worktree::TaskStatus::Failed => "✗".red(),
214 miyabi_worktree::TaskStatus::Timeout => "⏱".yellow(),
215 miyabi_worktree::TaskStatus::Cancelled => "⊗".dimmed(),
216 };
217
218 println!(
219 " {} Issue #{}: {} ({:.2}s)",
220 status_icon,
221 task_result.issue_number,
222 match task_result.status {
223 miyabi_worktree::TaskStatus::Success => "Success",
224 miyabi_worktree::TaskStatus::Failed => "Failed",
225 miyabi_worktree::TaskStatus::Timeout => "Timeout",
226 miyabi_worktree::TaskStatus::Cancelled => "Cancelled",
227 },
228 task_result.duration_ms as f64 / 1000.0
229 );
230
231 if let Some(error) = &task_result.error {
232 println!(" Error: {}", error.red());
233 }
234 }
235 println!();
236 }
237
238 println!("{}", "═".repeat(80).dimmed());
239 println!();
240
241 if result.all_successful() {
243 println!("{}", "✅ All tasks completed successfully!".green().bold());
244 Ok(())
245 } else {
246 println!(
247 "{}",
248 format!(
249 "⚠️ {} of {} tasks failed",
250 result.failed_count + result.timeout_count,
251 result.total_tasks
252 )
253 .yellow()
254 .bold()
255 );
256 Err(CliError::ExecutionError(format!(
257 "{} tasks failed",
258 result.failed_count + result.timeout_count
259 )))
260 }
261 }
262
263 fn load_config(&self) -> Result<AgentConfig> {
264 let github_token = self.get_github_token()?;
266
267 let device_identifier = std::env::var("DEVICE_IDENTIFIER")
269 .unwrap_or_else(|_| hostname::get().unwrap().to_string_lossy().to_string());
270
271 let (repo_owner, repo_name) = self.parse_git_remote()?;
273
274 Ok(AgentConfig {
275 device_identifier,
276 github_token,
277 repo_owner: Some(repo_owner),
278 repo_name: Some(repo_name),
279 use_task_tool: false,
280 use_worktree: true,
281 worktree_base_path: Some(".worktrees".to_string()),
282 log_directory: "./logs".to_string(),
283 report_directory: "./reports".to_string(),
284 tech_lead_github_username: None,
285 ciso_github_username: None,
286 po_github_username: None,
287 firebase_production_project: None,
288 firebase_staging_project: None,
289 production_url: None,
290 staging_url: None,
291 })
292 }
293
294 fn get_github_token(&self) -> Result<String> {
295 if let Ok(token) = std::env::var("GITHUB_TOKEN") {
297 if !token.trim().is_empty() {
298 return Ok(token.trim().to_string());
299 }
300 }
301
302 if let Ok(output) = std::process::Command::new("gh")
304 .args(["auth", "token"])
305 .output()
306 {
307 if output.status.success() {
308 let token = String::from_utf8_lossy(&output.stdout).trim().to_string();
309 if !token.is_empty()
310 && (token.starts_with("ghp_")
311 || token.starts_with("gho_")
312 || token.starts_with("ghu_")
313 || token.starts_with("ghs_")
314 || token.starts_with("ghr_"))
315 {
316 return Ok(token);
317 }
318 }
319 }
320
321 Err(CliError::GitConfig(
322 "GitHub token not found. Set GITHUB_TOKEN or run 'gh auth login'".to_string(),
323 ))
324 }
325
326 fn parse_git_remote(&self) -> Result<(String, String)> {
327 let output = std::process::Command::new("git")
328 .args(["remote", "get-url", "origin"])
329 .output()
330 .map_err(|e| CliError::GitConfig(format!("Failed to run git command: {}", e)))?;
331
332 if !output.status.success() {
333 return Err(CliError::GitConfig(
334 "Failed to get git remote URL".to_string(),
335 ));
336 }
337
338 let remote_url = String::from_utf8_lossy(&output.stdout).trim().to_string();
339
340 if remote_url.starts_with("http") && remote_url.contains("github.com/") {
342 let parts: Vec<&str> = remote_url
343 .split("github.com/")
344 .nth(1)
345 .ok_or_else(|| CliError::GitConfig("Invalid GitHub URL".to_string()))?
346 .trim_end_matches(".git")
347 .split('/')
348 .collect();
349
350 if parts.len() >= 2 {
351 return Ok((parts[0].to_string(), parts[1].to_string()));
352 }
353 }
354
355 if remote_url.starts_with("git@github.com:") {
357 let repo_part = remote_url
358 .strip_prefix("git@github.com:")
359 .ok_or_else(|| CliError::GitConfig("Invalid SSH URL".to_string()))?
360 .trim_end_matches(".git");
361
362 let parts: Vec<&str> = repo_part.split('/').collect();
363 if parts.len() >= 2 {
364 return Ok((parts[0].to_string(), parts[1].to_string()));
365 }
366 }
367
368 Err(CliError::GitConfig(format!(
369 "Could not parse GitHub owner/repo from: {}",
370 remote_url
371 )))
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 #[test]
380 fn test_parallel_command_creation() {
381 let cmd = ParallelCommand::new(vec![1, 2, 3], 2);
382 assert_eq!(cmd.issues, vec![1, 2, 3]);
383 assert_eq!(cmd.concurrency, 2);
384 }
385
386 #[test]
387 fn test_parallel_command_validation() {
388 let cmd = ParallelCommand::new(vec![], 2);
389 assert_eq!(cmd.issues.len(), 0);
391
392 let cmd = ParallelCommand::new(vec![1, 2], 0);
393 assert_eq!(cmd.concurrency, 0);
395 }
396}