1use crate::domain::RepoPathTemplate;
34use crate::git::{CloneOptions, GitOperations};
35use crate::types::{OpResult, OpSummary, OwnedRepo};
36use std::path::{Path, PathBuf};
37use std::sync::Arc;
38use tokio::sync::Semaphore;
39
40pub const MAX_CONCURRENCY: usize = 16;
43
44pub const MIN_CONCURRENCY: usize = 1;
46
47pub const DEFAULT_CONCURRENCY: usize = 8;
49
50pub trait CloneProgress: Send + Sync {
52 fn on_start(&self, repo: &OwnedRepo, index: usize, total: usize);
54
55 fn on_complete(&self, repo: &OwnedRepo, index: usize, total: usize);
57
58 fn on_error(&self, repo: &OwnedRepo, error: &str, index: usize, total: usize);
60
61 fn on_skip(&self, repo: &OwnedRepo, reason: &str, index: usize, total: usize);
63}
64
65#[derive(Debug, Clone, Copy, Default)]
67pub struct NoProgress;
68
69impl CloneProgress for NoProgress {
70 fn on_start(&self, _repo: &OwnedRepo, _index: usize, _total: usize) {}
71 fn on_complete(&self, _repo: &OwnedRepo, _index: usize, _total: usize) {}
72 fn on_error(&self, _repo: &OwnedRepo, _error: &str, _index: usize, _total: usize) {}
73 fn on_skip(&self, _repo: &OwnedRepo, _reason: &str, _index: usize, _total: usize) {}
74}
75
76#[derive(Debug)]
78pub struct CloneResult {
79 pub repo: OwnedRepo,
81 pub path: PathBuf,
83 pub result: OpResult,
85}
86
87#[derive(Debug, Clone)]
89pub struct CloneManagerOptions {
90 pub concurrency: usize,
92 pub clone_options: CloneOptions,
94 pub structure: String,
97 pub prefer_ssh: bool,
99 pub dry_run: bool,
101}
102
103impl Default for CloneManagerOptions {
104 fn default() -> Self {
105 Self {
106 concurrency: DEFAULT_CONCURRENCY,
107 clone_options: CloneOptions::default(),
108 structure: "{org}/{repo}".to_string(),
109 prefer_ssh: true,
110 dry_run: false,
111 }
112 }
113}
114
115impl CloneManagerOptions {
116 pub fn new() -> Self {
118 Self::default()
119 }
120
121 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
126 self.concurrency = concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
127 self
128 }
129
130 pub fn check_concurrency_cap(requested: usize) -> Option<usize> {
134 if requested > MAX_CONCURRENCY {
135 Some(MAX_CONCURRENCY)
136 } else {
137 None
138 }
139 }
140
141 pub fn with_clone_options(mut self, options: CloneOptions) -> Self {
143 self.clone_options = options;
144 self
145 }
146
147 pub fn with_structure(mut self, structure: impl Into<String>) -> Self {
149 self.structure = structure.into();
150 self
151 }
152
153 pub fn with_ssh(mut self, prefer_ssh: bool) -> Self {
155 self.prefer_ssh = prefer_ssh;
156 self
157 }
158
159 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
161 self.dry_run = dry_run;
162 self
163 }
164}
165
166pub struct CloneManager<G: GitOperations> {
168 git: Arc<G>,
169 options: CloneManagerOptions,
170}
171
172impl<G: GitOperations + 'static> CloneManager<G> {
173 pub fn new(git: G, mut options: CloneManagerOptions) -> Self {
175 options.concurrency = options.concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
176 Self {
177 git: Arc::new(git),
178 options,
179 }
180 }
181
182 pub fn compute_path(&self, base_path: &Path, repo: &OwnedRepo, provider: &str) -> PathBuf {
184 RepoPathTemplate::new(self.options.structure.clone())
185 .render_owned_repo(base_path, repo, provider)
186 }
187
188 pub fn get_clone_url<'a>(&self, repo: &'a OwnedRepo) -> &'a str {
190 if self.options.prefer_ssh {
191 &repo.repo.ssh_url
192 } else {
193 &repo.repo.clone_url
194 }
195 }
196
197 pub async fn clone_repos(
201 &self,
202 base_path: &Path,
203 repos: Vec<OwnedRepo>,
204 provider: &str,
205 progress: Arc<dyn CloneProgress>,
206 ) -> (OpSummary, Vec<CloneResult>) {
207 let total = repos.len();
208 let concurrency = self
209 .options
210 .concurrency
211 .clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
212 let semaphore = Arc::new(Semaphore::new(concurrency));
213 let mut handles = Vec::with_capacity(total);
214
215 for (index, repo) in repos.into_iter().enumerate() {
216 let permit = semaphore.clone().acquire_owned().await.unwrap();
217 let git = self.git.clone();
218 let clone_options = self.options.clone_options.clone();
219 let target_path = self.compute_path(base_path, &repo, provider);
220 let url = self.get_clone_url(&repo).to_string();
221 let dry_run = self.options.dry_run;
222 let progress = Arc::clone(&progress);
223 let panic_repo = repo.clone();
224 let panic_path = target_path.clone();
225
226 let handle = tokio::spawn(async move {
227 progress.on_start(&repo, index, total);
229 let result = if dry_run {
230 OpResult::Skipped("dry run".to_string())
231 } else if target_path.exists() {
232 OpResult::Skipped("directory already exists".to_string())
233 } else {
234 if let Some(parent) = target_path.parent() {
236 if let Err(e) = std::fs::create_dir_all(parent) {
237 OpResult::Failed(format!("Failed to create directory: {}", e))
238 } else {
239 match tokio::task::spawn_blocking({
241 let git = git.clone();
242 let url = url.clone();
243 let target_path = target_path.clone();
244 let clone_options = clone_options.clone();
245 move || git.clone_repo(&url, &target_path, &clone_options)
246 })
247 .await
248 {
249 Ok(Ok(())) => OpResult::Success,
250 Ok(Err(e)) => OpResult::Failed(e.to_string()),
251 Err(e) => OpResult::Failed(format!("Task panicked: {}", e)),
252 }
253 }
254 } else {
255 OpResult::Failed("Invalid target path".to_string())
256 }
257 };
258
259 drop(permit); CloneResult {
262 repo,
263 path: target_path,
264 result,
265 }
266 });
267
268 handles.push((panic_repo, panic_path, handle));
269 }
270
271 let mut summary = OpSummary::new();
273 let mut results = Vec::with_capacity(total);
274
275 for (index, (panic_repo, panic_path, handle)) in handles.into_iter().enumerate() {
276 match handle.await {
277 Ok(clone_result) => {
278 match &clone_result.result {
280 OpResult::Success => {
281 progress.on_complete(&clone_result.repo, index, total);
282 }
283 OpResult::Failed(err) => {
284 progress.on_error(&clone_result.repo, err, index, total);
285 }
286 OpResult::Skipped(reason) => {
287 progress.on_skip(&clone_result.repo, reason, index, total);
288 }
289 }
290
291 summary.record(&clone_result.result);
292 results.push(clone_result);
293 }
294 Err(e) => {
295 let err = format!("Task panicked: {}", e);
296 progress.on_error(&panic_repo, &err, index, total);
297 let failed = CloneResult {
298 repo: panic_repo,
299 path: panic_path,
300 result: OpResult::Failed(err),
301 };
302 summary.record(&failed.result);
303 results.push(failed);
304 }
305 }
306 }
307
308 (summary, results)
309 }
310
311 pub fn clone_single(&self, base_path: &Path, repo: &OwnedRepo, provider: &str) -> CloneResult {
313 let target_path = self.compute_path(base_path, repo, provider);
314 let url = self.get_clone_url(repo);
315
316 let result = if self.options.dry_run {
317 OpResult::Skipped("dry run".to_string())
318 } else if target_path.exists() {
319 OpResult::Skipped("directory already exists".to_string())
320 } else {
321 if let Some(parent) = target_path.parent() {
323 if let Err(e) = std::fs::create_dir_all(parent) {
324 OpResult::Failed(format!("Failed to create directory: {}", e))
325 } else {
326 match self
327 .git
328 .clone_repo(url, &target_path, &self.options.clone_options)
329 {
330 Ok(()) => OpResult::Success,
331 Err(e) => OpResult::Failed(e.to_string()),
332 }
333 }
334 } else {
335 OpResult::Failed("Invalid target path".to_string())
336 }
337 };
338
339 CloneResult {
340 repo: repo.clone(),
341 path: target_path,
342 result,
343 }
344 }
345}
346
347#[cfg(test)]
348#[path = "clone_tests.rs"]
349mod tests;