1use crate::git::{FetchResult, GitOperations, PullResult, RepoStatus};
36use crate::types::{OpResult, OpSummary, OwnedRepo};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40
41use super::clone::{MAX_CONCURRENCY, MIN_CONCURRENCY};
42
43pub trait SyncProgress: Send + Sync {
45 fn on_start(&self, repo: &OwnedRepo, path: &Path, index: usize, total: usize);
47
48 fn on_fetch_complete(&self, repo: &OwnedRepo, result: &FetchResult, index: usize, total: usize);
50
51 fn on_pull_complete(&self, repo: &OwnedRepo, result: &PullResult, index: usize, total: usize);
53
54 fn on_error(&self, repo: &OwnedRepo, error: &str, index: usize, total: usize);
56
57 fn on_skip(&self, repo: &OwnedRepo, reason: &str, index: usize, total: usize);
59}
60
61#[derive(Debug, Clone, Copy, Default)]
63pub struct NoSyncProgress;
64
65impl SyncProgress for NoSyncProgress {
66 fn on_start(&self, _repo: &OwnedRepo, _path: &Path, _index: usize, _total: usize) {}
67 fn on_fetch_complete(
68 &self,
69 _repo: &OwnedRepo,
70 _result: &FetchResult,
71 _index: usize,
72 _total: usize,
73 ) {
74 }
75 fn on_pull_complete(
76 &self,
77 _repo: &OwnedRepo,
78 _result: &PullResult,
79 _index: usize,
80 _total: usize,
81 ) {
82 }
83 fn on_error(&self, _repo: &OwnedRepo, _error: &str, _index: usize, _total: usize) {}
84 fn on_skip(&self, _repo: &OwnedRepo, _reason: &str, _index: usize, _total: usize) {}
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
89pub enum SyncMode {
90 #[default]
92 Fetch,
93 Pull,
95}
96
97#[derive(Debug)]
99pub struct SyncResult {
100 pub repo: OwnedRepo,
102 pub path: PathBuf,
104 pub result: OpResult,
106 pub had_updates: bool,
108 pub status: Option<RepoStatus>,
110 pub fetch_result: Option<FetchResult>,
112 pub pull_result: Option<PullResult>,
114}
115
116#[derive(Debug, Clone)]
118pub struct LocalRepo {
119 pub repo: OwnedRepo,
121 pub path: PathBuf,
123}
124
125impl LocalRepo {
126 pub fn new(repo: OwnedRepo, path: impl Into<PathBuf>) -> Self {
128 Self {
129 repo,
130 path: path.into(),
131 }
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct SyncManagerOptions {
138 pub concurrency: usize,
140 pub mode: SyncMode,
142 pub skip_uncommitted: bool,
144 pub dry_run: bool,
146}
147
148impl Default for SyncManagerOptions {
149 fn default() -> Self {
150 Self {
151 concurrency: crate::operations::clone::DEFAULT_CONCURRENCY,
152 mode: SyncMode::Fetch,
153 skip_uncommitted: true,
154 dry_run: false,
155 }
156 }
157}
158
159impl SyncManagerOptions {
160 pub fn new() -> Self {
162 Self::default()
163 }
164
165 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
167 self.concurrency = concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
168 self
169 }
170
171 pub fn with_mode(mut self, mode: SyncMode) -> Self {
173 self.mode = mode;
174 self
175 }
176
177 pub fn with_skip_uncommitted(mut self, skip_uncommitted: bool) -> Self {
179 self.skip_uncommitted = skip_uncommitted;
180 self
181 }
182
183 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
185 self.dry_run = dry_run;
186 self
187 }
188}
189
190pub struct SyncManager<G: GitOperations> {
192 git: Arc<G>,
193 options: SyncManagerOptions,
194}
195
196impl<G: GitOperations + 'static> SyncManager<G> {
197 pub fn new(git: G, mut options: SyncManagerOptions) -> Self {
199 options.concurrency = options.concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
200 Self {
201 git: Arc::new(git),
202 options,
203 }
204 }
205
206 pub async fn sync_repos(
208 &self,
209 repos: Vec<LocalRepo>,
210 progress: Arc<dyn SyncProgress>,
211 ) -> (OpSummary, Vec<SyncResult>) {
212 let total = repos.len();
213 let concurrency = self
214 .options
215 .concurrency
216 .clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
217 let semaphore = Arc::new(Semaphore::new(concurrency));
218 let mut handles = Vec::with_capacity(total);
219
220 for (index, local_repo) in repos.into_iter().enumerate() {
221 let permit = semaphore.clone().acquire_owned().await.unwrap();
222 let git = self.git.clone();
223 let mode = self.options.mode;
224 let skip_uncommitted = self.options.skip_uncommitted;
225 let dry_run = self.options.dry_run;
226 let progress = Arc::clone(&progress);
227
228 let handle = tokio::spawn(async move {
229 progress.on_start(&local_repo.repo, &local_repo.path, index, total);
231 let path = local_repo.path.clone();
232
233 if !path.exists() {
235 drop(permit);
236 return SyncResult {
237 repo: local_repo.repo,
238 path,
239 result: OpResult::Skipped("path does not exist".to_string()),
240 had_updates: false,
241 status: None,
242 fetch_result: None,
243 pull_result: None,
244 };
245 }
246 if !git.is_repo(&path) {
247 drop(permit);
248 return SyncResult {
249 repo: local_repo.repo,
250 path,
251 result: OpResult::Skipped("not a git repository".to_string()),
252 had_updates: false,
253 status: None,
254 fetch_result: None,
255 pull_result: None,
256 };
257 }
258
259 let status = match tokio::task::spawn_blocking({
261 let git = git.clone();
262 let path = path.clone();
263 move || git.status(&path)
264 })
265 .await
266 {
267 Ok(Ok(status)) => Some(status),
268 Ok(Err(e)) if skip_uncommitted => {
269 drop(permit);
270 return SyncResult {
271 repo: local_repo.repo,
272 path,
273 result: OpResult::Skipped(format!("failed to get status: {}", e)),
274 had_updates: false,
275 status: None,
276 fetch_result: None,
277 pull_result: None,
278 };
279 }
280 Ok(Err(_)) => None,
281 Err(e) if skip_uncommitted => {
282 drop(permit);
283 return SyncResult {
284 repo: local_repo.repo,
285 path,
286 result: OpResult::Skipped(format!(
287 "failed to get status: task join error: {}",
288 e
289 )),
290 had_updates: false,
291 status: None,
292 fetch_result: None,
293 pull_result: None,
294 };
295 }
296 Err(_) => None,
297 };
298
299 if skip_uncommitted {
301 if let Some(ref s) = status {
302 if s.is_uncommitted || s.has_untracked {
303 drop(permit);
304 return SyncResult {
305 repo: local_repo.repo,
306 path,
307 result: OpResult::Skipped("uncommitted changes".to_string()),
308 had_updates: false,
309 status,
310 fetch_result: None,
311 pull_result: None,
312 };
313 }
314 }
315 }
316
317 if dry_run {
319 drop(permit);
320 return SyncResult {
321 repo: local_repo.repo,
322 path,
323 result: OpResult::Skipped("dry run".to_string()),
324 had_updates: false,
325 status,
326 fetch_result: None,
327 pull_result: None,
328 };
329 }
330
331 let fetch_result = tokio::task::spawn_blocking({
333 let git = git.clone();
334 let path = path.clone();
335 move || git.fetch(&path)
336 })
337 .await;
338
339 let fetch_result = match fetch_result {
340 Ok(Ok(r)) => r,
341 Ok(Err(e)) => {
342 drop(permit);
343 return SyncResult {
344 repo: local_repo.repo,
345 path,
346 result: OpResult::Failed(e.to_string()),
347 had_updates: false,
348 status,
349 fetch_result: None,
350 pull_result: None,
351 };
352 }
353 Err(e) => {
354 drop(permit);
355 return SyncResult {
356 repo: local_repo.repo,
357 path,
358 result: OpResult::Failed(format!("Task panicked: {}", e)),
359 had_updates: false,
360 status,
361 fetch_result: None,
362 pull_result: None,
363 };
364 }
365 };
366
367 let had_updates = fetch_result.updated;
368
369 if mode == SyncMode::Pull && had_updates {
371 let pull_task_result = tokio::task::spawn_blocking({
372 let git = git.clone();
373 let path = path.clone();
374 move || git.pull(&path)
375 })
376 .await;
377
378 let (result, actual_pull_result) = match pull_task_result {
379 Ok(Ok(r)) if r.success => (OpResult::Success, Some(r)),
380 Ok(Ok(r)) => (
381 OpResult::Failed(
382 r.error.clone().unwrap_or_else(|| "Pull failed".to_string()),
383 ),
384 Some(r),
385 ),
386 Ok(Err(e)) => (OpResult::Failed(e.to_string()), None),
387 Err(e) => (OpResult::Failed(format!("Task panicked: {}", e)), None),
388 };
389
390 drop(permit);
391 SyncResult {
392 repo: local_repo.repo,
393 path,
394 result,
395 had_updates,
396 status,
397 fetch_result: Some(fetch_result),
398 pull_result: actual_pull_result,
399 }
400 } else {
401 drop(permit);
402 SyncResult {
403 repo: local_repo.repo,
404 path,
405 result: OpResult::Success,
406 had_updates,
407 status,
408 fetch_result: Some(fetch_result),
409 pull_result: None,
410 }
411 }
412 });
413
414 handles.push(handle);
415 }
416
417 let mut summary = OpSummary::new();
419 let mut results = Vec::with_capacity(total);
420
421 for (index, handle) in handles.into_iter().enumerate() {
422 match handle.await {
423 Ok(sync_result) => {
424 match &sync_result.result {
426 OpResult::Success => {
427 if let Some(ref pull_result) = sync_result.pull_result {
428 progress.on_pull_complete(
429 &sync_result.repo,
430 pull_result,
431 index,
432 total,
433 );
434 } else if let Some(ref fetch_result) = sync_result.fetch_result {
435 progress.on_fetch_complete(
436 &sync_result.repo,
437 fetch_result,
438 index,
439 total,
440 );
441 }
442 }
443 OpResult::Failed(err) => {
444 progress.on_error(&sync_result.repo, err, index, total);
445 }
446 OpResult::Skipped(reason) => {
447 progress.on_skip(&sync_result.repo, reason, index, total);
448 }
449 }
450
451 summary.record(&sync_result.result);
452 results.push(sync_result);
453 }
454 Err(e) => {
455 summary.record(&OpResult::Failed(format!("Task panicked: {}", e)));
456 }
457 }
458 }
459
460 (summary, results)
461 }
462
463 pub fn sync_single(&self, local_repo: &LocalRepo) -> SyncResult {
465 let path = &local_repo.path;
466
467 if !path.exists() {
469 return SyncResult {
470 repo: local_repo.repo.clone(),
471 path: path.clone(),
472 result: OpResult::Skipped("path does not exist".to_string()),
473 had_updates: false,
474 status: None,
475 fetch_result: None,
476 pull_result: None,
477 };
478 }
479 if !self.git.is_repo(path) {
480 return SyncResult {
481 repo: local_repo.repo.clone(),
482 path: path.clone(),
483 result: OpResult::Skipped("not a git repository".to_string()),
484 had_updates: false,
485 status: None,
486 fetch_result: None,
487 pull_result: None,
488 };
489 }
490
491 let status = match self.git.status(path) {
493 Ok(status) => Some(status),
494 Err(e) if self.options.skip_uncommitted => {
495 return SyncResult {
496 repo: local_repo.repo.clone(),
497 path: path.clone(),
498 result: OpResult::Skipped(format!("failed to get status: {}", e)),
499 had_updates: false,
500 status: None,
501 fetch_result: None,
502 pull_result: None,
503 };
504 }
505 Err(_) => None,
506 };
507
508 if self.options.skip_uncommitted {
510 if let Some(ref s) = status {
511 if s.is_uncommitted || s.has_untracked {
512 return SyncResult {
513 repo: local_repo.repo.clone(),
514 path: path.clone(),
515 result: OpResult::Skipped("uncommitted changes".to_string()),
516 had_updates: false,
517 status,
518 fetch_result: None,
519 pull_result: None,
520 };
521 }
522 }
523 }
524
525 if self.options.dry_run {
527 return SyncResult {
528 repo: local_repo.repo.clone(),
529 path: path.clone(),
530 result: OpResult::Skipped("dry run".to_string()),
531 had_updates: false,
532 status,
533 fetch_result: None,
534 pull_result: None,
535 };
536 }
537
538 let fetch_result = match self.git.fetch(path) {
540 Ok(r) => r,
541 Err(e) => {
542 return SyncResult {
543 repo: local_repo.repo.clone(),
544 path: path.clone(),
545 result: OpResult::Failed(e.to_string()),
546 had_updates: false,
547 status,
548 fetch_result: None,
549 pull_result: None,
550 };
551 }
552 };
553
554 let had_updates = fetch_result.updated;
555
556 if self.options.mode == SyncMode::Pull && had_updates {
558 match self.git.pull(path) {
559 Ok(r) if r.success => SyncResult {
560 repo: local_repo.repo.clone(),
561 path: path.clone(),
562 result: OpResult::Success,
563 had_updates,
564 status,
565 fetch_result: Some(fetch_result),
566 pull_result: Some(r),
567 },
568 Ok(r) => SyncResult {
569 repo: local_repo.repo.clone(),
570 path: path.clone(),
571 result: OpResult::Failed(
572 r.error.clone().unwrap_or_else(|| "Pull failed".to_string()),
573 ),
574 had_updates,
575 status,
576 fetch_result: Some(fetch_result),
577 pull_result: Some(r),
578 },
579 Err(e) => SyncResult {
580 repo: local_repo.repo.clone(),
581 path: path.clone(),
582 result: OpResult::Failed(e.to_string()),
583 had_updates,
584 status,
585 fetch_result: Some(fetch_result),
586 pull_result: None,
587 },
588 }
589 } else {
590 SyncResult {
591 repo: local_repo.repo.clone(),
592 path: path.clone(),
593 result: OpResult::Success,
594 had_updates,
595 status,
596 fetch_result: Some(fetch_result),
597 pull_result: None,
598 }
599 }
600 }
601}
602
603#[cfg(test)]
604#[path = "sync_tests.rs"]
605mod tests;