miyabi_cli/commands/
parallel.rs

1//! Parallel command - Execute agents in parallel worktrees
2
3use crate::error::{CliError, Result};
4use colored::Colorize;
5use miyabi_agents::base::BaseAgent;
6use miyabi_agents::CoordinatorAgentWithLLM;
7use miyabi_types::{AgentConfig, AgentType, Task};
8use miyabi_worktree::{PoolConfig, WorktreePool, WorktreeTask};
9use std::collections::HashMap;
10
11pub struct ParallelCommand {
12    pub issues: Vec<u64>,
13    pub concurrency: usize,
14}
15
16impl ParallelCommand {
17    pub fn new(issues: Vec<u64>, concurrency: usize) -> Self {
18        Self {
19            issues,
20            concurrency,
21        }
22    }
23
24    pub async fn execute(&self) -> Result<()> {
25        println!(
26            "{}",
27            format!(
28                "🚀 Starting parallel execution of {} issues with concurrency {}...",
29                self.issues.len(),
30                self.concurrency
31            )
32            .cyan()
33            .bold()
34        );
35        println!();
36
37        // Validate inputs
38        if self.issues.is_empty() {
39            return Err(CliError::InvalidInput("No issues provided".to_string()));
40        }
41
42        if self.concurrency == 0 {
43            return Err(CliError::InvalidInput(
44                "Concurrency must be at least 1".to_string(),
45            ));
46        }
47
48        // Load configuration
49        let config = self.load_config()?;
50
51        // Create worktree pool
52        let pool_config = PoolConfig {
53            max_concurrency: self.concurrency,
54            timeout_seconds: 3600, // 1 hour per task
55            fail_fast: false,
56            auto_cleanup: true,
57        };
58
59        println!("  Pool Configuration:");
60        println!("    Max concurrency: {}", pool_config.max_concurrency);
61        println!("    Timeout: {}s per task", pool_config.timeout_seconds);
62        println!("    Auto cleanup: {}", pool_config.auto_cleanup);
63        println!();
64
65        let pool = WorktreePool::new(pool_config).map_err(|e| {
66            CliError::ExecutionError(format!("Failed to create worktree pool: {}", e))
67        })?;
68
69        // Create tasks for each issue
70        let tasks: Vec<WorktreeTask> = self
71            .issues
72            .iter()
73            .map(|&issue_number| WorktreeTask {
74                issue_number,
75                description: format!("Process Issue #{}", issue_number),
76                agent_type: Some("CoordinatorAgent".to_string()),
77                metadata: None,
78            })
79            .collect();
80
81        println!("  Tasks:");
82        for task in &tasks {
83            println!(
84                "    {} Issue #{}: {}",
85                "•".dimmed(),
86                task.issue_number,
87                task.description.dimmed()
88            );
89        }
90        println!();
91
92        // Execute in parallel
93        println!("{}", "  Executing in parallel...".dimmed());
94        println!();
95
96        let start_time = std::time::Instant::now();
97
98        let result = pool
99            .execute_parallel(tasks, move |worktree_info, task| {
100                let config = config.clone();
101
102                async move {
103                    println!(
104                        "    {} [Issue #{}] Starting in worktree {:?}",
105                        "▶".green(),
106                        task.issue_number,
107                        worktree_info.path.file_name().unwrap_or_default()
108                    );
109
110                    // Create CoordinatorAgent with LLM
111                    let agent = CoordinatorAgentWithLLM::new(config);
112
113                    // Create coordinator task
114                    let agent_task = Task {
115                        id: format!("coordinator-issue-{}", task.issue_number),
116                        title: format!("Coordinate Issue #{}", task.issue_number),
117                        description: format!(
118                            "Decompose Issue #{} into executable tasks",
119                            task.issue_number
120                        ),
121                        task_type: miyabi_types::task::TaskType::Feature,
122                        priority: 1,
123                        severity: None,
124                        impact: None,
125                        assigned_agent: Some(AgentType::CoordinatorAgent),
126                        dependencies: vec![],
127                        estimated_duration: Some(5),
128                        status: None,
129                        start_time: None,
130                        end_time: None,
131                        metadata: Some(HashMap::from([(
132                            "issue_number".to_string(),
133                            serde_json::json!(task.issue_number),
134                        )])),
135                    };
136
137                    // Execute agent
138                    let agent_result = agent
139                        .execute(&agent_task)
140                        .await
141                        .map_err(|e| miyabi_types::error::MiyabiError::Unknown(e.to_string()))?;
142
143                    println!(
144                        "    {} [Issue #{}] Completed with status: {:?}",
145                        "✓".green().bold(),
146                        task.issue_number,
147                        agent_result.status
148                    );
149
150                    // Return result data
151                    Ok(agent_result.data.unwrap_or_else(|| {
152                        serde_json::json!({
153                            "status": "completed",
154                            "issue": task.issue_number
155                        })
156                    }))
157                }
158            })
159            .await;
160
161        let elapsed = start_time.elapsed();
162
163        println!();
164        println!("{}", "═".repeat(80).dimmed());
165        println!();
166        println!("{}", "📊 Execution Results".cyan().bold());
167        println!();
168        println!("  Summary:");
169        println!("    Total tasks: {}", result.total_tasks);
170        println!(
171            "    {} Successful: {}",
172            "✓".green().bold(),
173            result.success_count
174        );
175        if result.failed_count > 0 {
176            println!("    {} Failed: {}", "✗".red().bold(), result.failed_count);
177        }
178        if result.timeout_count > 0 {
179            println!(
180                "    {} Timeout: {}",
181                "⏱".yellow().bold(),
182                result.timeout_count
183            );
184        }
185        println!();
186
187        println!("  Performance:");
188        println!("    Wall time: {:.2}s", elapsed.as_secs_f64());
189        println!(
190            "    Total duration: {:.2}s",
191            result.total_duration_ms as f64 / 1000.0
192        );
193        println!("    Success rate: {:.1}%", result.success_rate());
194        println!(
195            "    Average task duration: {:.2}s",
196            result.average_duration_ms() / 1000.0
197        );
198
199        // Calculate speedup
200        let sequential_time = result.average_duration_ms() * result.total_tasks as f64;
201        let parallel_time = result.total_duration_ms as f64;
202        let speedup = sequential_time / parallel_time;
203
204        println!("    Estimated speedup: {:.2}x", speedup);
205        println!();
206
207        // Show individual results
208        if result.results.len() <= 10 {
209            println!("  Individual Results:");
210            for task_result in &result.results {
211                let status_icon = match task_result.status {
212                    miyabi_worktree::TaskStatus::Success => "✓".green(),
213                    miyabi_worktree::TaskStatus::Failed => "✗".red(),
214                    miyabi_worktree::TaskStatus::Timeout => "⏱".yellow(),
215                    miyabi_worktree::TaskStatus::Cancelled => "⊗".dimmed(),
216                };
217
218                println!(
219                    "    {} Issue #{}: {} ({:.2}s)",
220                    status_icon,
221                    task_result.issue_number,
222                    match task_result.status {
223                        miyabi_worktree::TaskStatus::Success => "Success",
224                        miyabi_worktree::TaskStatus::Failed => "Failed",
225                        miyabi_worktree::TaskStatus::Timeout => "Timeout",
226                        miyabi_worktree::TaskStatus::Cancelled => "Cancelled",
227                    },
228                    task_result.duration_ms as f64 / 1000.0
229                );
230
231                if let Some(error) = &task_result.error {
232                    println!("      Error: {}", error.red());
233                }
234            }
235            println!();
236        }
237
238        println!("{}", "═".repeat(80).dimmed());
239        println!();
240
241        // Exit with appropriate status
242        if result.all_successful() {
243            println!("{}", "✅ All tasks completed successfully!".green().bold());
244            Ok(())
245        } else {
246            println!(
247                "{}",
248                format!(
249                    "⚠️  {} of {} tasks failed",
250                    result.failed_count + result.timeout_count,
251                    result.total_tasks
252                )
253                .yellow()
254                .bold()
255            );
256            Err(CliError::ExecutionError(format!(
257                "{} tasks failed",
258                result.failed_count + result.timeout_count
259            )))
260        }
261    }
262
263    fn load_config(&self) -> Result<AgentConfig> {
264        // Get GitHub token
265        let github_token = self.get_github_token()?;
266
267        // Get device identifier
268        let device_identifier = std::env::var("DEVICE_IDENTIFIER")
269            .unwrap_or_else(|_| hostname::get().unwrap().to_string_lossy().to_string());
270
271        // Parse repository owner and name from git remote
272        let (repo_owner, repo_name) = self.parse_git_remote()?;
273
274        Ok(AgentConfig {
275            device_identifier,
276            github_token,
277            repo_owner: Some(repo_owner),
278            repo_name: Some(repo_name),
279            use_task_tool: false,
280            use_worktree: true,
281            worktree_base_path: Some(".worktrees".to_string()),
282            log_directory: "./logs".to_string(),
283            report_directory: "./reports".to_string(),
284            tech_lead_github_username: None,
285            ciso_github_username: None,
286            po_github_username: None,
287            firebase_production_project: None,
288            firebase_staging_project: None,
289            production_url: None,
290            staging_url: None,
291        })
292    }
293
294    fn get_github_token(&self) -> Result<String> {
295        // Try environment variable first
296        if let Ok(token) = std::env::var("GITHUB_TOKEN") {
297            if !token.trim().is_empty() {
298                return Ok(token.trim().to_string());
299            }
300        }
301
302        // Try gh CLI
303        if let Ok(output) = std::process::Command::new("gh")
304            .args(["auth", "token"])
305            .output()
306        {
307            if output.status.success() {
308                let token = String::from_utf8_lossy(&output.stdout).trim().to_string();
309                if !token.is_empty()
310                    && (token.starts_with("ghp_")
311                        || token.starts_with("gho_")
312                        || token.starts_with("ghu_")
313                        || token.starts_with("ghs_")
314                        || token.starts_with("ghr_"))
315                {
316                    return Ok(token);
317                }
318            }
319        }
320
321        Err(CliError::GitConfig(
322            "GitHub token not found. Set GITHUB_TOKEN or run 'gh auth login'".to_string(),
323        ))
324    }
325
326    fn parse_git_remote(&self) -> Result<(String, String)> {
327        let output = std::process::Command::new("git")
328            .args(["remote", "get-url", "origin"])
329            .output()
330            .map_err(|e| CliError::GitConfig(format!("Failed to run git command: {}", e)))?;
331
332        if !output.status.success() {
333            return Err(CliError::GitConfig(
334                "Failed to get git remote URL".to_string(),
335            ));
336        }
337
338        let remote_url = String::from_utf8_lossy(&output.stdout).trim().to_string();
339
340        // Parse HTTPS format
341        if remote_url.starts_with("http") && remote_url.contains("github.com/") {
342            let parts: Vec<&str> = remote_url
343                .split("github.com/")
344                .nth(1)
345                .ok_or_else(|| CliError::GitConfig("Invalid GitHub URL".to_string()))?
346                .trim_end_matches(".git")
347                .split('/')
348                .collect();
349
350            if parts.len() >= 2 {
351                return Ok((parts[0].to_string(), parts[1].to_string()));
352            }
353        }
354
355        // Parse SSH format
356        if remote_url.starts_with("git@github.com:") {
357            let repo_part = remote_url
358                .strip_prefix("git@github.com:")
359                .ok_or_else(|| CliError::GitConfig("Invalid SSH URL".to_string()))?
360                .trim_end_matches(".git");
361
362            let parts: Vec<&str> = repo_part.split('/').collect();
363            if parts.len() >= 2 {
364                return Ok((parts[0].to_string(), parts[1].to_string()));
365            }
366        }
367
368        Err(CliError::GitConfig(format!(
369            "Could not parse GitHub owner/repo from: {}",
370            remote_url
371        )))
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    #[test]
380    fn test_parallel_command_creation() {
381        let cmd = ParallelCommand::new(vec![1, 2, 3], 2);
382        assert_eq!(cmd.issues, vec![1, 2, 3]);
383        assert_eq!(cmd.concurrency, 2);
384    }
385
386    #[test]
387    fn test_parallel_command_validation() {
388        let cmd = ParallelCommand::new(vec![], 2);
389        // Empty issues should fail validation in execute()
390        assert_eq!(cmd.issues.len(), 0);
391
392        let cmd = ParallelCommand::new(vec![1, 2], 0);
393        // Zero concurrency should fail validation in execute()
394        assert_eq!(cmd.concurrency, 0);
395    }
396}