Skip to main content

git_same/operations/
clone.rs

1//! Parallel cloning operations.
2//!
3//! This module provides functionality for cloning repositories,
4//! including parallel cloning with controlled concurrency.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use git_same::operations::clone::{CloneManager, CloneManagerOptions, NoProgress};
10//! use git_same::git::ShellGit;
11//! use std::path::Path;
12//!
13//! # async fn example() {
14//! let git = ShellGit::new();
15//! let options = CloneManagerOptions::new()
16//!     .with_concurrency(4)
17//!     .with_structure("{org}/{repo}");
18//!
19//! let manager = CloneManager::new(git, options);
20//!
21//! // repos would come from discovery
22//! let repos = vec![];
23//! let progress = NoProgress;
24//!
25//! let (summary, results) = manager
26//!     .clone_repos(Path::new("~/github"), repos, "github", std::sync::Arc::new(progress))
27//!     .await;
28//!
29//! println!("Cloned {} repos, {} failed", summary.success, summary.failed);
30//! # }
31//! ```
32
33use crate::domain::RepoPathTemplate;
34use crate::git::{CloneOptions, GitOperations};
35use crate::types::{OpResult, OpSummary, OwnedRepo};
36use std::path::{Path, PathBuf};
37use std::sync::Arc;
38use tokio::sync::Semaphore;
39
40/// Maximum allowed concurrency to prevent resource exhaustion.
41/// Higher values can cause "too many open files" errors and network saturation.
42pub const MAX_CONCURRENCY: usize = 16;
43
44/// Minimum concurrency (at least one clone at a time).
45pub const MIN_CONCURRENCY: usize = 1;
46
47/// Default concurrency when not specified in config.
48pub const DEFAULT_CONCURRENCY: usize = 8;
49
50/// Progress callback for clone operations.
51pub trait CloneProgress: Send + Sync {
52    /// Called when a clone starts.
53    fn on_start(&self, repo: &OwnedRepo, index: usize, total: usize);
54
55    /// Called when a clone completes successfully.
56    fn on_complete(&self, repo: &OwnedRepo, index: usize, total: usize);
57
58    /// Called when a clone fails.
59    fn on_error(&self, repo: &OwnedRepo, error: &str, index: usize, total: usize);
60
61    /// Called when a clone is skipped.
62    fn on_skip(&self, repo: &OwnedRepo, reason: &str, index: usize, total: usize);
63}
64
65/// A no-op progress implementation for when no progress reporting is needed.
66#[derive(Debug, Clone, Copy, Default)]
67pub struct NoProgress;
68
69impl CloneProgress for NoProgress {
70    fn on_start(&self, _repo: &OwnedRepo, _index: usize, _total: usize) {}
71    fn on_complete(&self, _repo: &OwnedRepo, _index: usize, _total: usize) {}
72    fn on_error(&self, _repo: &OwnedRepo, _error: &str, _index: usize, _total: usize) {}
73    fn on_skip(&self, _repo: &OwnedRepo, _reason: &str, _index: usize, _total: usize) {}
74}
75
76/// Result of a single clone operation.
77#[derive(Debug)]
78pub struct CloneResult {
79    /// The repository that was cloned
80    pub repo: OwnedRepo,
81    /// The local path where it was cloned
82    pub path: PathBuf,
83    /// The operation result
84    pub result: OpResult,
85}
86
87/// Options for the clone manager.
88#[derive(Debug, Clone)]
89pub struct CloneManagerOptions {
90    /// Maximum number of concurrent clones
91    pub concurrency: usize,
92    /// Clone options (depth, branch, submodules)
93    pub clone_options: CloneOptions,
94    /// Directory structure template
95    /// Supports: {provider}, {org}, {repo}
96    pub structure: String,
97    /// Whether to use SSH URLs (vs HTTPS)
98    pub prefer_ssh: bool,
99    /// Whether this is a dry run
100    pub dry_run: bool,
101}
102
103impl Default for CloneManagerOptions {
104    fn default() -> Self {
105        Self {
106            concurrency: DEFAULT_CONCURRENCY,
107            clone_options: CloneOptions::default(),
108            structure: "{org}/{repo}".to_string(),
109            prefer_ssh: true,
110            dry_run: false,
111        }
112    }
113}
114
115impl CloneManagerOptions {
116    /// Creates new options with defaults.
117    pub fn new() -> Self {
118        Self::default()
119    }
120
121    /// Sets the concurrency level, clamped to [MIN_CONCURRENCY, MAX_CONCURRENCY].
122    ///
123    /// Returns the options with the effective concurrency set.
124    /// Use [`effective_concurrency`] to check if the value was capped.
125    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
126        self.concurrency = concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
127        self
128    }
129
130    /// Checks if a requested concurrency exceeds the maximum.
131    ///
132    /// Returns `Some(MAX_CONCURRENCY)` if capping occurred, `None` otherwise.
133    pub fn check_concurrency_cap(requested: usize) -> Option<usize> {
134        if requested > MAX_CONCURRENCY {
135            Some(MAX_CONCURRENCY)
136        } else {
137            None
138        }
139    }
140
141    /// Sets the clone options.
142    pub fn with_clone_options(mut self, options: CloneOptions) -> Self {
143        self.clone_options = options;
144        self
145    }
146
147    /// Sets the directory structure.
148    pub fn with_structure(mut self, structure: impl Into<String>) -> Self {
149        self.structure = structure.into();
150        self
151    }
152
153    /// Sets SSH preference.
154    pub fn with_ssh(mut self, prefer_ssh: bool) -> Self {
155        self.prefer_ssh = prefer_ssh;
156        self
157    }
158
159    /// Sets dry run mode.
160    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
161        self.dry_run = dry_run;
162        self
163    }
164}
165
166/// Manages parallel clone operations.
167pub struct CloneManager<G: GitOperations> {
168    git: Arc<G>,
169    options: CloneManagerOptions,
170}
171
172impl<G: GitOperations + 'static> CloneManager<G> {
173    /// Creates a new clone manager.
174    pub fn new(git: G, mut options: CloneManagerOptions) -> Self {
175        options.concurrency = options.concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
176        Self {
177            git: Arc::new(git),
178            options,
179        }
180    }
181
182    /// Computes the local path for a repository.
183    pub fn compute_path(&self, base_path: &Path, repo: &OwnedRepo, provider: &str) -> PathBuf {
184        RepoPathTemplate::new(self.options.structure.clone())
185            .render_owned_repo(base_path, repo, provider)
186    }
187
188    /// Gets the clone URL for a repository.
189    pub fn get_clone_url<'a>(&self, repo: &'a OwnedRepo) -> &'a str {
190        if self.options.prefer_ssh {
191            &repo.repo.ssh_url
192        } else {
193            &repo.repo.clone_url
194        }
195    }
196
197    /// Clones repositories in parallel.
198    ///
199    /// Returns a summary of operations and individual results.
200    pub async fn clone_repos(
201        &self,
202        base_path: &Path,
203        repos: Vec<OwnedRepo>,
204        provider: &str,
205        progress: Arc<dyn CloneProgress>,
206    ) -> (OpSummary, Vec<CloneResult>) {
207        let total = repos.len();
208        let concurrency = self
209            .options
210            .concurrency
211            .clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
212        let semaphore = Arc::new(Semaphore::new(concurrency));
213        let mut handles = Vec::with_capacity(total);
214
215        for (index, repo) in repos.into_iter().enumerate() {
216            let permit = semaphore.clone().acquire_owned().await.unwrap();
217            let git = self.git.clone();
218            let clone_options = self.options.clone_options.clone();
219            let target_path = self.compute_path(base_path, &repo, provider);
220            let url = self.get_clone_url(&repo).to_string();
221            let dry_run = self.options.dry_run;
222            let progress = Arc::clone(&progress);
223            let panic_repo = repo.clone();
224            let panic_path = target_path.clone();
225
226            let handle = tokio::spawn(async move {
227                // Notify progress - clone starting
228                progress.on_start(&repo, index, total);
229                let result = if dry_run {
230                    OpResult::Skipped("dry run".to_string())
231                } else if target_path.exists() {
232                    OpResult::Skipped("directory already exists".to_string())
233                } else {
234                    // Create parent directories
235                    if let Some(parent) = target_path.parent() {
236                        if let Err(e) = std::fs::create_dir_all(parent) {
237                            OpResult::Failed(format!("Failed to create directory: {}", e))
238                        } else {
239                            // Perform the clone (blocking operation)
240                            match tokio::task::spawn_blocking({
241                                let git = git.clone();
242                                let url = url.clone();
243                                let target_path = target_path.clone();
244                                let clone_options = clone_options.clone();
245                                move || git.clone_repo(&url, &target_path, &clone_options)
246                            })
247                            .await
248                            {
249                                Ok(Ok(())) => OpResult::Success,
250                                Ok(Err(e)) => OpResult::Failed(e.to_string()),
251                                Err(e) => OpResult::Failed(format!("Task panicked: {}", e)),
252                            }
253                        }
254                    } else {
255                        OpResult::Failed("Invalid target path".to_string())
256                    }
257                };
258
259                drop(permit); // Release semaphore
260
261                CloneResult {
262                    repo,
263                    path: target_path,
264                    result,
265                }
266            });
267
268            handles.push((panic_repo, panic_path, handle));
269        }
270
271        // Collect results
272        let mut summary = OpSummary::new();
273        let mut results = Vec::with_capacity(total);
274
275        for (index, (panic_repo, panic_path, handle)) in handles.into_iter().enumerate() {
276            match handle.await {
277                Ok(clone_result) => {
278                    // Notify progress
279                    match &clone_result.result {
280                        OpResult::Success => {
281                            progress.on_complete(&clone_result.repo, index, total);
282                        }
283                        OpResult::Failed(err) => {
284                            progress.on_error(&clone_result.repo, err, index, total);
285                        }
286                        OpResult::Skipped(reason) => {
287                            progress.on_skip(&clone_result.repo, reason, index, total);
288                        }
289                    }
290
291                    summary.record(&clone_result.result);
292                    results.push(clone_result);
293                }
294                Err(e) => {
295                    let err = format!("Task panicked: {}", e);
296                    progress.on_error(&panic_repo, &err, index, total);
297                    let failed = CloneResult {
298                        repo: panic_repo,
299                        path: panic_path,
300                        result: OpResult::Failed(err),
301                    };
302                    summary.record(&failed.result);
303                    results.push(failed);
304                }
305            }
306        }
307
308        (summary, results)
309    }
310
311    /// Clones a single repository synchronously.
312    pub fn clone_single(&self, base_path: &Path, repo: &OwnedRepo, provider: &str) -> CloneResult {
313        let target_path = self.compute_path(base_path, repo, provider);
314        let url = self.get_clone_url(repo);
315
316        let result = if self.options.dry_run {
317            OpResult::Skipped("dry run".to_string())
318        } else if target_path.exists() {
319            OpResult::Skipped("directory already exists".to_string())
320        } else {
321            // Create parent directories
322            if let Some(parent) = target_path.parent() {
323                if let Err(e) = std::fs::create_dir_all(parent) {
324                    OpResult::Failed(format!("Failed to create directory: {}", e))
325                } else {
326                    match self
327                        .git
328                        .clone_repo(url, &target_path, &self.options.clone_options)
329                    {
330                        Ok(()) => OpResult::Success,
331                        Err(e) => OpResult::Failed(e.to_string()),
332                    }
333                }
334            } else {
335                OpResult::Failed("Invalid target path".to_string())
336            }
337        };
338
339        CloneResult {
340            repo: repo.clone(),
341            path: target_path,
342            result,
343        }
344    }
345}
346
347#[cfg(test)]
348#[path = "clone_tests.rs"]
349mod tests;