agcodex_core/subagents/
worktree.rs

1//! Git worktree management for parallel agent execution
2//!
3//! This module provides git worktree isolation for agents to work in parallel
4//! without conflicts, and handles merging their changes back together.
5
6use serde::Deserialize;
7use serde::Serialize;
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::process::Command;
12use tokio::sync::Mutex;
13use tokio::sync::RwLock;
14use tracing::debug;
15use tracing::info;
16use tracing::warn;
17use uuid::Uuid;
18
19use crate::subagents::SubagentError;
20use crate::subagents::SubagentResult;
21
22/// Git worktree for agent isolation
23#[derive(Debug, Clone)]
24pub struct AgentWorktree {
25    /// Unique identifier for this worktree
26    pub id: Uuid,
27    /// Agent name using this worktree
28    pub agent_name: String,
29    /// Path to the worktree
30    pub path: PathBuf,
31    /// Branch name for this worktree
32    pub branch: String,
33    /// Base branch this was created from
34    pub base_branch: String,
35    /// Whether this worktree is currently active
36    pub active: bool,
37    /// Creation timestamp
38    pub created_at: std::time::SystemTime,
39}
40
41/// Conflict resolution strategy
42#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
43pub enum ConflictStrategy {
44    /// Fail on any conflict
45    Fail,
46    /// Keep changes from the incoming branch
47    KeepTheirs,
48    /// Keep changes from the current branch
49    KeepOurs,
50    /// Attempt automatic merge, fail if conflict
51    AutoMerge,
52    /// Create conflict markers for manual resolution
53    Manual,
54}
55
56/// Merge result from combining agent work
57#[derive(Debug, Clone)]
58pub struct MergeResult {
59    /// Whether the merge was successful
60    pub success: bool,
61    /// Files that were modified
62    pub modified_files: Vec<PathBuf>,
63    /// Files with conflicts (if any)
64    pub conflicts: Vec<PathBuf>,
65    /// Merge commit hash (if successful)
66    pub commit_hash: Option<String>,
67    /// Error message (if failed)
68    pub error: Option<String>,
69}
70
71/// Manager for git worktrees used by agents
72pub struct WorktreeManager {
73    /// Base repository path
74    base_repo: PathBuf,
75    /// Directory for worktrees
76    worktree_dir: PathBuf,
77    /// Active worktrees
78    worktrees: Arc<RwLock<HashMap<Uuid, AgentWorktree>>>,
79    /// Lock for git operations
80    git_lock: Arc<Mutex<()>>,
81}
82
83impl WorktreeManager {
84    /// Create a new worktree manager
85    pub fn new(base_repo: PathBuf) -> SubagentResult<Self> {
86        let worktree_dir = base_repo.join(".agcodex").join("worktrees");
87
88        // Create worktree directory if it doesn't exist
89        std::fs::create_dir_all(&worktree_dir).map_err(SubagentError::Io)?;
90
91        Ok(Self {
92            base_repo,
93            worktree_dir,
94            worktrees: Arc::new(RwLock::new(HashMap::new())),
95            git_lock: Arc::new(Mutex::new(())),
96        })
97    }
98
99    /// Create a new worktree for an agent
100    pub async fn create_worktree(
101        &self,
102        agent_name: &str,
103        base_branch: Option<&str>,
104    ) -> SubagentResult<AgentWorktree> {
105        let _lock = self.git_lock.lock().await;
106
107        let id = Uuid::new_v4();
108        let branch_name = format!("agent/{}/{}", agent_name, id);
109        let worktree_path = self.worktree_dir.join(&branch_name);
110        let base_branch = base_branch.unwrap_or("main").to_string();
111
112        info!(
113            "Creating worktree for agent '{}' from branch '{}'",
114            agent_name, base_branch
115        );
116
117        // Create the worktree
118        let output = Command::new("git")
119            .args([
120                "worktree",
121                "add",
122                "-b",
123                &branch_name,
124                worktree_path.to_str().unwrap(),
125                &base_branch,
126            ])
127            .current_dir(&self.base_repo)
128            .output()
129            .await
130            .map_err(|e| {
131                SubagentError::ExecutionFailed(format!("Failed to create worktree: {}", e))
132            })?;
133
134        if !output.status.success() {
135            let stderr = String::from_utf8_lossy(&output.stderr);
136            return Err(SubagentError::ExecutionFailed(format!(
137                "Git worktree creation failed: {}",
138                stderr
139            )));
140        }
141
142        let worktree = AgentWorktree {
143            id,
144            agent_name: agent_name.to_string(),
145            path: worktree_path,
146            branch: branch_name,
147            base_branch,
148            active: true,
149            created_at: std::time::SystemTime::now(),
150        };
151
152        // Store the worktree
153        let mut worktrees = self.worktrees.write().await;
154        worktrees.insert(id, worktree.clone());
155
156        debug!("Created worktree {} for agent {}", id, agent_name);
157        Ok(worktree)
158    }
159
160    /// Remove a worktree
161    pub async fn remove_worktree(&self, id: Uuid) -> SubagentResult<()> {
162        let _lock = self.git_lock.lock().await;
163
164        let mut worktrees = self.worktrees.write().await;
165        let worktree = worktrees
166            .remove(&id)
167            .ok_or_else(|| SubagentError::ExecutionFailed(format!("Worktree {} not found", id)))?;
168
169        info!("Removing worktree {} for agent {}", id, worktree.agent_name);
170
171        // Remove the worktree
172        let output = Command::new("git")
173            .args(["worktree", "remove", worktree.path.to_str().unwrap()])
174            .current_dir(&self.base_repo)
175            .output()
176            .await
177            .map_err(|e| {
178                SubagentError::ExecutionFailed(format!("Failed to remove worktree: {}", e))
179            })?;
180
181        if !output.status.success() {
182            let stderr = String::from_utf8_lossy(&output.stderr);
183            warn!("Git worktree removal failed: {}", stderr);
184            // Try force removal
185            let _ = Command::new("git")
186                .args([
187                    "worktree",
188                    "remove",
189                    "--force",
190                    worktree.path.to_str().unwrap(),
191                ])
192                .current_dir(&self.base_repo)
193                .output()
194                .await;
195        }
196
197        // Delete the branch
198        let _ = Command::new("git")
199            .args(["branch", "-D", &worktree.branch])
200            .current_dir(&self.base_repo)
201            .output()
202            .await;
203
204        Ok(())
205    }
206
207    /// Commit changes in a worktree
208    pub async fn commit_changes(&self, worktree_id: Uuid, message: &str) -> SubagentResult<String> {
209        let worktrees = self.worktrees.read().await;
210        let worktree = worktrees.get(&worktree_id).ok_or_else(|| {
211            SubagentError::ExecutionFailed(format!("Worktree {} not found", worktree_id))
212        })?;
213
214        let worktree_path = worktree.path.clone();
215        drop(worktrees); // Release the lock
216
217        info!("Committing changes in worktree {}", worktree_id);
218
219        // Stage all changes
220        let output = Command::new("git")
221            .args(["add", "-A"])
222            .current_dir(&worktree_path)
223            .output()
224            .await
225            .map_err(|e| {
226                SubagentError::ExecutionFailed(format!("Failed to stage changes: {}", e))
227            })?;
228
229        if !output.status.success() {
230            let stderr = String::from_utf8_lossy(&output.stderr);
231            return Err(SubagentError::ExecutionFailed(format!(
232                "Git add failed: {}",
233                stderr
234            )));
235        }
236
237        // Commit changes
238        let output = Command::new("git")
239            .args(["commit", "-m", message])
240            .current_dir(&worktree_path)
241            .output()
242            .await
243            .map_err(|e| SubagentError::ExecutionFailed(format!("Failed to commit: {}", e)))?;
244
245        if !output.status.success() {
246            let stderr = String::from_utf8_lossy(&output.stderr);
247            if stderr.contains("nothing to commit") {
248                return Ok("No changes to commit".to_string());
249            }
250            return Err(SubagentError::ExecutionFailed(format!(
251                "Git commit failed: {}",
252                stderr
253            )));
254        }
255
256        // Get the commit hash
257        let output = Command::new("git")
258            .args(["rev-parse", "HEAD"])
259            .current_dir(&worktree_path)
260            .output()
261            .await
262            .map_err(|e| {
263                SubagentError::ExecutionFailed(format!("Failed to get commit hash: {}", e))
264            })?;
265
266        let commit_hash = String::from_utf8_lossy(&output.stdout).trim().to_string();
267        debug!(
268            "Committed changes in worktree {}: {}",
269            worktree_id, commit_hash
270        );
271
272        Ok(commit_hash)
273    }
274
275    /// Merge changes from multiple agents
276    pub async fn merge_agent_changes(
277        &self,
278        worktree_ids: Vec<Uuid>,
279        target_branch: &str,
280        strategy: ConflictStrategy,
281    ) -> SubagentResult<MergeResult> {
282        let _lock = self.git_lock.lock().await;
283
284        info!(
285            "Merging {} agent worktrees into branch '{}'",
286            worktree_ids.len(),
287            target_branch
288        );
289
290        let mut result = MergeResult {
291            success: true,
292            modified_files: Vec::new(),
293            conflicts: Vec::new(),
294            commit_hash: None,
295            error: None,
296        };
297
298        // Switch to target branch in main repo
299        let output = Command::new("git")
300            .args(["checkout", target_branch])
301            .current_dir(&self.base_repo)
302            .output()
303            .await
304            .map_err(|e| {
305                SubagentError::ExecutionFailed(format!("Failed to checkout target branch: {}", e))
306            })?;
307
308        if !output.status.success() {
309            let stderr = String::from_utf8_lossy(&output.stderr);
310            result.success = false;
311            result.error = Some(format!("Failed to checkout {}: {}", target_branch, stderr));
312            return Ok(result);
313        }
314
315        // Merge each worktree branch
316        for worktree_id in worktree_ids {
317            let worktrees = self.worktrees.read().await;
318            let worktree = match worktrees.get(&worktree_id) {
319                Some(w) => w.clone(),
320                None => {
321                    warn!("Worktree {} not found, skipping", worktree_id);
322                    continue;
323                }
324            };
325            drop(worktrees);
326
327            let merge_result = self.merge_single_branch(&worktree.branch, strategy).await?;
328
329            if !merge_result.success {
330                result.success = false;
331                result.conflicts.extend(merge_result.conflicts);
332                result.error = merge_result.error;
333                break;
334            }
335
336            result.modified_files.extend(merge_result.modified_files);
337        }
338
339        if result.success {
340            // Get the final commit hash
341            let output = Command::new("git")
342                .args(["rev-parse", "HEAD"])
343                .current_dir(&self.base_repo)
344                .output()
345                .await
346                .map_err(|e| {
347                    SubagentError::ExecutionFailed(format!("Failed to get commit hash: {}", e))
348                })?;
349
350            result.commit_hash = Some(String::from_utf8_lossy(&output.stdout).trim().to_string());
351        }
352
353        Ok(result)
354    }
355
356    /// Merge a single branch
357    async fn merge_single_branch(
358        &self,
359        branch: &str,
360        strategy: ConflictStrategy,
361    ) -> SubagentResult<MergeResult> {
362        info!("Merging branch '{}' with strategy {:?}", branch, strategy);
363
364        let merge_args = match strategy {
365            ConflictStrategy::Fail => vec!["merge", "--no-ff", branch],
366            ConflictStrategy::KeepTheirs => vec!["merge", "--no-ff", "-X", "theirs", branch],
367            ConflictStrategy::KeepOurs => vec!["merge", "--no-ff", "-X", "ours", branch],
368            ConflictStrategy::AutoMerge => vec!["merge", "--no-ff", branch],
369            ConflictStrategy::Manual => vec!["merge", "--no-ff", "--no-commit", branch],
370        };
371
372        let output = Command::new("git")
373            .args(&merge_args)
374            .current_dir(&self.base_repo)
375            .output()
376            .await
377            .map_err(|e| SubagentError::ExecutionFailed(format!("Failed to merge: {}", e)))?;
378
379        let mut result = MergeResult {
380            success: output.status.success(),
381            modified_files: Vec::new(),
382            conflicts: Vec::new(),
383            commit_hash: None,
384            error: None,
385        };
386
387        if !output.status.success() {
388            let stderr = String::from_utf8_lossy(&output.stderr);
389
390            // Check for conflicts
391            if stderr.contains("CONFLICT") || stderr.contains("Automatic merge failed") {
392                result.conflicts = self.get_conflicted_files().await?;
393
394                if strategy == ConflictStrategy::Fail {
395                    // Abort the merge
396                    let _ = Command::new("git")
397                        .args(["merge", "--abort"])
398                        .current_dir(&self.base_repo)
399                        .output()
400                        .await;
401
402                    result.error =
403                        Some(format!("Merge conflicts detected: {:?}", result.conflicts));
404                }
405            } else {
406                result.error = Some(format!("Merge failed: {}", stderr));
407            }
408        } else {
409            // Get modified files
410            result.modified_files = self.get_modified_files(branch).await?;
411        }
412
413        Ok(result)
414    }
415
416    /// Get list of conflicted files
417    async fn get_conflicted_files(&self) -> SubagentResult<Vec<PathBuf>> {
418        let output = Command::new("git")
419            .args(["diff", "--name-only", "--diff-filter=U"])
420            .current_dir(&self.base_repo)
421            .output()
422            .await
423            .map_err(|e| {
424                SubagentError::ExecutionFailed(format!("Failed to get conflicts: {}", e))
425            })?;
426
427        let files = String::from_utf8_lossy(&output.stdout)
428            .lines()
429            .map(|line| PathBuf::from(line.trim()))
430            .collect();
431
432        Ok(files)
433    }
434
435    /// Get list of modified files in a merge
436    async fn get_modified_files(&self, branch: &str) -> SubagentResult<Vec<PathBuf>> {
437        let output = Command::new("git")
438            .args(["diff", "--name-only", &format!("{}^", branch), branch])
439            .current_dir(&self.base_repo)
440            .output()
441            .await
442            .map_err(|e| {
443                SubagentError::ExecutionFailed(format!("Failed to get modified files: {}", e))
444            })?;
445
446        let files = String::from_utf8_lossy(&output.stdout)
447            .lines()
448            .map(|line| PathBuf::from(line.trim()))
449            .collect();
450
451        Ok(files)
452    }
453
454    /// Clean up old worktrees
455    pub async fn cleanup_old_worktrees(&self, max_age: std::time::Duration) -> SubagentResult<()> {
456        let now = std::time::SystemTime::now();
457        let worktrees = self.worktrees.read().await;
458
459        let mut to_remove = Vec::new();
460        for (id, worktree) in worktrees.iter() {
461            if let Ok(age) = now.duration_since(worktree.created_at)
462                && age > max_age
463                && !worktree.active
464            {
465                to_remove.push(*id);
466            }
467        }
468        drop(worktrees);
469
470        for id in to_remove {
471            info!("Cleaning up old worktree {}", id);
472            if let Err(e) = self.remove_worktree(id).await {
473                warn!("Failed to clean up worktree {}: {}", id, e);
474            }
475        }
476
477        Ok(())
478    }
479
480    /// List all active worktrees
481    pub async fn list_worktrees(&self) -> Vec<AgentWorktree> {
482        let worktrees = self.worktrees.read().await;
483        worktrees.values().cloned().collect()
484    }
485
486    /// Get worktree by ID
487    pub async fn get_worktree(&self, id: Uuid) -> Option<AgentWorktree> {
488        let worktrees = self.worktrees.read().await;
489        worktrees.get(&id).cloned()
490    }
491
492    /// Mark a worktree as inactive
493    pub async fn deactivate_worktree(&self, id: Uuid) -> SubagentResult<()> {
494        let mut worktrees = self.worktrees.write().await;
495        if let Some(worktree) = worktrees.get_mut(&id) {
496            worktree.active = false;
497            Ok(())
498        } else {
499            Err(SubagentError::ExecutionFailed(format!(
500                "Worktree {} not found",
501                id
502            )))
503        }
504    }
505}
506
507/// Worktree pool for managing multiple worktrees efficiently
508pub struct WorktreePool {
509    manager: Arc<WorktreeManager>,
510    /// Maximum number of concurrent worktrees
511    max_worktrees: usize,
512    /// Worktrees available for reuse
513    available: Arc<Mutex<Vec<AgentWorktree>>>,
514    /// Worktrees currently in use
515    in_use: Arc<Mutex<HashMap<Uuid, AgentWorktree>>>,
516}
517
518impl WorktreePool {
519    pub fn new(manager: Arc<WorktreeManager>, max_worktrees: usize) -> Self {
520        Self {
521            manager,
522            max_worktrees,
523            available: Arc::new(Mutex::new(Vec::new())),
524            in_use: Arc::new(Mutex::new(HashMap::new())),
525        }
526    }
527
528    /// Acquire a worktree from the pool
529    pub async fn acquire(&self, agent_name: &str) -> SubagentResult<AgentWorktree> {
530        // Try to get an available worktree
531        let mut available = self.available.lock().await;
532        if let Some(mut worktree) = available.pop() {
533            worktree.agent_name = agent_name.to_string();
534            worktree.active = true;
535
536            let mut in_use = self.in_use.lock().await;
537            in_use.insert(worktree.id, worktree.clone());
538
539            debug!("Reusing worktree {} for agent {}", worktree.id, agent_name);
540            return Ok(worktree);
541        }
542        drop(available);
543
544        // Check if we've reached the limit
545        let in_use = self.in_use.lock().await;
546        if in_use.len() >= self.max_worktrees {
547            return Err(SubagentError::ExecutionFailed(format!(
548                "Worktree pool limit ({}) reached",
549                self.max_worktrees
550            )));
551        }
552        drop(in_use);
553
554        // Create a new worktree
555        let worktree = self.manager.create_worktree(agent_name, None).await?;
556
557        let mut in_use = self.in_use.lock().await;
558        in_use.insert(worktree.id, worktree.clone());
559
560        Ok(worktree)
561    }
562
563    /// Release a worktree back to the pool
564    pub async fn release(&self, id: Uuid) -> SubagentResult<()> {
565        let mut in_use = self.in_use.lock().await;
566        if let Some(mut worktree) = in_use.remove(&id) {
567            // Reset the worktree
568            let output = Command::new("git")
569                .args(["clean", "-fd"])
570                .current_dir(&worktree.path)
571                .output()
572                .await?;
573
574            if output.status.success() {
575                let _ = Command::new("git")
576                    .args(["checkout", "."])
577                    .current_dir(&worktree.path)
578                    .output()
579                    .await;
580
581                worktree.active = false;
582
583                let mut available = self.available.lock().await;
584                available.push(worktree);
585
586                debug!("Released worktree {} back to pool", id);
587            } else {
588                // If reset fails, remove the worktree
589                warn!("Failed to reset worktree {}, removing", id);
590                self.manager.remove_worktree(id).await?;
591            }
592        }
593
594        Ok(())
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use tempfile::TempDir;
602
603    async fn setup_test_repo() -> (TempDir, WorktreeManager) {
604        let temp_dir = TempDir::new().unwrap();
605        let repo_path = temp_dir.path().to_path_buf();
606
607        // Initialize git repo with main as the default branch
608        Command::new("git")
609            .args(["init", "-b", "main"])
610            .current_dir(&repo_path)
611            .output()
612            .await
613            .unwrap();
614
615        // Configure git
616        Command::new("git")
617            .args(["config", "user.email", "test@example.com"])
618            .current_dir(&repo_path)
619            .output()
620            .await
621            .unwrap();
622
623        Command::new("git")
624            .args(["config", "user.name", "Test User"])
625            .current_dir(&repo_path)
626            .output()
627            .await
628            .unwrap();
629
630        // Create initial commit
631        std::fs::write(repo_path.join("README.md"), "# Test Repo").unwrap();
632        Command::new("git")
633            .args(["add", "."])
634            .current_dir(&repo_path)
635            .output()
636            .await
637            .unwrap();
638
639        Command::new("git")
640            .args(["commit", "-m", "Initial commit"])
641            .current_dir(&repo_path)
642            .output()
643            .await
644            .unwrap();
645
646        let manager = WorktreeManager::new(repo_path).unwrap();
647        (temp_dir, manager)
648    }
649
650    #[tokio::test]
651    async fn test_create_and_remove_worktree() {
652        let (_temp_dir, manager) = setup_test_repo().await;
653
654        // Create worktree
655        let worktree = manager.create_worktree("test-agent", None).await.unwrap();
656        assert_eq!(worktree.agent_name, "test-agent");
657        assert!(worktree.path.exists());
658
659        // Remove worktree
660        manager.remove_worktree(worktree.id).await.unwrap();
661        assert!(!worktree.path.exists());
662    }
663
664    #[tokio::test]
665    async fn test_worktree_pool() {
666        let (_temp_dir, manager) = setup_test_repo().await;
667        let pool = WorktreePool::new(Arc::new(manager), 2);
668
669        // Acquire worktrees
670        let wt1 = pool.acquire("agent1").await.unwrap();
671        let _wt2 = pool.acquire("agent2").await.unwrap();
672
673        // Should fail when pool is full
674        assert!(pool.acquire("agent3").await.is_err());
675
676        // Release and reacquire
677        pool.release(wt1.id).await.unwrap();
678        let wt3 = pool.acquire("agent3").await.unwrap();
679        assert_eq!(wt3.id, wt1.id); // Should reuse the same worktree
680    }
681}