Skip to main content

git_same/operations/
sync.rs

1//! Sync manager for fetch and pull operations.
2//!
3//! This module provides functionality for syncing existing local repositories
4//! with their remotes, including parallel fetch and pull operations.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use git_same::operations::sync::{SyncManager, SyncManagerOptions, SyncMode, LocalRepo, NoSyncProgress};
10//! use git_same::git::ShellGit;
11//! use git_same::types::{OwnedRepo, Repo};
12//! use std::path::PathBuf;
13//!
14//! # async fn example() {
15//! let git = ShellGit::new();
16//! let options = SyncManagerOptions::new()
17//!     .with_concurrency(4)
18//!     .with_mode(SyncMode::Fetch);
19//!
20//! let manager = SyncManager::new(git, options);
21//!
22//! // repos would come from discovery
23//! let repos: Vec<LocalRepo> = vec![];
24//! let progress = NoSyncProgress;
25//!
26//! let (summary, results) = manager
27//!     .sync_repos(repos, std::sync::Arc::new(progress))
28//!     .await;
29//!
30//! println!("Synced {} repos, {} had updates", summary.success,
31//!     results.iter().filter(|r| r.had_updates).count());
32//! # }
33//! ```
34
35use crate::git::{FetchResult, GitOperations, PullResult, RepoStatus};
36use crate::types::{OpResult, OpSummary, OwnedRepo};
37use std::path::{Path, PathBuf};
38use std::sync::Arc;
39use tokio::sync::Semaphore;
40
41use super::clone::{MAX_CONCURRENCY, MIN_CONCURRENCY};
42
43/// Progress callback for sync operations.
44pub trait SyncProgress: Send + Sync {
45    /// Called when a sync operation starts.
46    fn on_start(&self, repo: &OwnedRepo, path: &Path, index: usize, total: usize);
47
48    /// Called when a fetch completes.
49    fn on_fetch_complete(&self, repo: &OwnedRepo, result: &FetchResult, index: usize, total: usize);
50
51    /// Called when a pull completes.
52    fn on_pull_complete(&self, repo: &OwnedRepo, result: &PullResult, index: usize, total: usize);
53
54    /// Called when a sync fails.
55    fn on_error(&self, repo: &OwnedRepo, error: &str, index: usize, total: usize);
56
57    /// Called when a sync is skipped.
58    fn on_skip(&self, repo: &OwnedRepo, reason: &str, index: usize, total: usize);
59}
60
61/// A no-op progress implementation.
62#[derive(Debug, Clone, Copy, Default)]
63pub struct NoSyncProgress;
64
65impl SyncProgress for NoSyncProgress {
66    fn on_start(&self, _repo: &OwnedRepo, _path: &Path, _index: usize, _total: usize) {}
67    fn on_fetch_complete(
68        &self,
69        _repo: &OwnedRepo,
70        _result: &FetchResult,
71        _index: usize,
72        _total: usize,
73    ) {
74    }
75    fn on_pull_complete(
76        &self,
77        _repo: &OwnedRepo,
78        _result: &PullResult,
79        _index: usize,
80        _total: usize,
81    ) {
82    }
83    fn on_error(&self, _repo: &OwnedRepo, _error: &str, _index: usize, _total: usize) {}
84    fn on_skip(&self, _repo: &OwnedRepo, _reason: &str, _index: usize, _total: usize) {}
85}
86
87/// Sync mode - fetch only or pull.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
89pub enum SyncMode {
90    /// Only fetch, don't modify working tree
91    #[default]
92    Fetch,
93    /// Fetch and pull (fast-forward only)
94    Pull,
95}
96
97/// Result of a single sync operation.
98#[derive(Debug)]
99pub struct SyncResult {
100    /// The repository that was synced
101    pub repo: OwnedRepo,
102    /// The local path
103    pub path: PathBuf,
104    /// The operation result
105    pub result: OpResult,
106    /// Whether updates were available
107    pub had_updates: bool,
108    /// Repository status before sync
109    pub status: Option<RepoStatus>,
110    /// Fetch result (if fetch was performed)
111    pub fetch_result: Option<FetchResult>,
112    /// Pull result (if pull was performed)
113    pub pull_result: Option<PullResult>,
114}
115
116/// A repository with its local path for syncing.
117#[derive(Debug, Clone)]
118pub struct LocalRepo {
119    /// The owned repo metadata
120    pub repo: OwnedRepo,
121    /// Local filesystem path
122    pub path: PathBuf,
123}
124
125impl LocalRepo {
126    /// Creates a new local repo.
127    pub fn new(repo: OwnedRepo, path: impl Into<PathBuf>) -> Self {
128        Self {
129            repo,
130            path: path.into(),
131        }
132    }
133}
134
135/// Options for the sync manager.
136#[derive(Debug, Clone)]
137pub struct SyncManagerOptions {
138    /// Maximum number of concurrent syncs
139    pub concurrency: usize,
140    /// Sync mode (fetch or pull)
141    pub mode: SyncMode,
142    /// Skip repos with uncommitted changes
143    pub skip_uncommitted: bool,
144    /// Whether this is a dry run
145    pub dry_run: bool,
146}
147
148impl Default for SyncManagerOptions {
149    fn default() -> Self {
150        Self {
151            concurrency: crate::operations::clone::DEFAULT_CONCURRENCY,
152            mode: SyncMode::Fetch,
153            skip_uncommitted: true,
154            dry_run: false,
155        }
156    }
157}
158
159impl SyncManagerOptions {
160    /// Creates new options with defaults.
161    pub fn new() -> Self {
162        Self::default()
163    }
164
165    /// Sets the concurrency level, clamped to [MIN_CONCURRENCY, MAX_CONCURRENCY].
166    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
167        self.concurrency = concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
168        self
169    }
170
171    /// Sets the sync mode.
172    pub fn with_mode(mut self, mode: SyncMode) -> Self {
173        self.mode = mode;
174        self
175    }
176
177    /// Sets whether to skip uncommitted repos.
178    pub fn with_skip_uncommitted(mut self, skip_uncommitted: bool) -> Self {
179        self.skip_uncommitted = skip_uncommitted;
180        self
181    }
182
183    /// Sets dry run mode.
184    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
185        self.dry_run = dry_run;
186        self
187    }
188}
189
190/// Manages parallel sync operations.
191pub struct SyncManager<G: GitOperations> {
192    git: Arc<G>,
193    options: SyncManagerOptions,
194}
195
196impl<G: GitOperations + 'static> SyncManager<G> {
197    /// Creates a new sync manager.
198    pub fn new(git: G, mut options: SyncManagerOptions) -> Self {
199        options.concurrency = options.concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
200        Self {
201            git: Arc::new(git),
202            options,
203        }
204    }
205
206    /// Syncs repositories in parallel.
207    pub async fn sync_repos(
208        &self,
209        repos: Vec<LocalRepo>,
210        progress: Arc<dyn SyncProgress>,
211    ) -> (OpSummary, Vec<SyncResult>) {
212        let total = repos.len();
213        let concurrency = self
214            .options
215            .concurrency
216            .clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
217        let semaphore = Arc::new(Semaphore::new(concurrency));
218        let mut handles = Vec::with_capacity(total);
219
220        for (index, local_repo) in repos.into_iter().enumerate() {
221            let permit = semaphore.clone().acquire_owned().await.unwrap();
222            let git = self.git.clone();
223            let mode = self.options.mode;
224            let skip_uncommitted = self.options.skip_uncommitted;
225            let dry_run = self.options.dry_run;
226            let progress = Arc::clone(&progress);
227
228            let handle = tokio::spawn(async move {
229                // Notify progress - sync starting
230                progress.on_start(&local_repo.repo, &local_repo.path, index, total);
231                let path = local_repo.path.clone();
232
233                // Check if path exists and is a repo
234                if !path.exists() {
235                    drop(permit);
236                    return SyncResult {
237                        repo: local_repo.repo,
238                        path,
239                        result: OpResult::Skipped("path does not exist".to_string()),
240                        had_updates: false,
241                        status: None,
242                        fetch_result: None,
243                        pull_result: None,
244                    };
245                }
246                if !git.is_repo(&path) {
247                    drop(permit);
248                    return SyncResult {
249                        repo: local_repo.repo,
250                        path,
251                        result: OpResult::Skipped("not a git repository".to_string()),
252                        had_updates: false,
253                        status: None,
254                        fetch_result: None,
255                        pull_result: None,
256                    };
257                }
258
259                // Get status (blocking)
260                let status = match tokio::task::spawn_blocking({
261                    let git = git.clone();
262                    let path = path.clone();
263                    move || git.status(&path)
264                })
265                .await
266                {
267                    Ok(Ok(status)) => Some(status),
268                    Ok(Err(e)) if skip_uncommitted => {
269                        drop(permit);
270                        return SyncResult {
271                            repo: local_repo.repo,
272                            path,
273                            result: OpResult::Skipped(format!("failed to get status: {}", e)),
274                            had_updates: false,
275                            status: None,
276                            fetch_result: None,
277                            pull_result: None,
278                        };
279                    }
280                    Ok(Err(_)) => None,
281                    Err(e) if skip_uncommitted => {
282                        drop(permit);
283                        return SyncResult {
284                            repo: local_repo.repo,
285                            path,
286                            result: OpResult::Skipped(format!(
287                                "failed to get status: task join error: {}",
288                                e
289                            )),
290                            had_updates: false,
291                            status: None,
292                            fetch_result: None,
293                            pull_result: None,
294                        };
295                    }
296                    Err(_) => None,
297                };
298
299                // Check if uncommitted and should skip
300                if skip_uncommitted {
301                    if let Some(ref s) = status {
302                        if s.is_uncommitted || s.has_untracked {
303                            drop(permit);
304                            return SyncResult {
305                                repo: local_repo.repo,
306                                path,
307                                result: OpResult::Skipped("uncommitted changes".to_string()),
308                                had_updates: false,
309                                status,
310                                fetch_result: None,
311                                pull_result: None,
312                            };
313                        }
314                    }
315                }
316
317                // Dry run
318                if dry_run {
319                    drop(permit);
320                    return SyncResult {
321                        repo: local_repo.repo,
322                        path,
323                        result: OpResult::Skipped("dry run".to_string()),
324                        had_updates: false,
325                        status,
326                        fetch_result: None,
327                        pull_result: None,
328                    };
329                }
330
331                // Perform fetch (blocking)
332                let fetch_result = tokio::task::spawn_blocking({
333                    let git = git.clone();
334                    let path = path.clone();
335                    move || git.fetch(&path)
336                })
337                .await;
338
339                let fetch_result = match fetch_result {
340                    Ok(Ok(r)) => r,
341                    Ok(Err(e)) => {
342                        drop(permit);
343                        return SyncResult {
344                            repo: local_repo.repo,
345                            path,
346                            result: OpResult::Failed(e.to_string()),
347                            had_updates: false,
348                            status,
349                            fetch_result: None,
350                            pull_result: None,
351                        };
352                    }
353                    Err(e) => {
354                        drop(permit);
355                        return SyncResult {
356                            repo: local_repo.repo,
357                            path,
358                            result: OpResult::Failed(format!("Task panicked: {}", e)),
359                            had_updates: false,
360                            status,
361                            fetch_result: None,
362                            pull_result: None,
363                        };
364                    }
365                };
366
367                let had_updates = fetch_result.updated;
368
369                // If pull mode and has updates, do pull
370                if mode == SyncMode::Pull && had_updates {
371                    let pull_task_result = tokio::task::spawn_blocking({
372                        let git = git.clone();
373                        let path = path.clone();
374                        move || git.pull(&path)
375                    })
376                    .await;
377
378                    let (result, actual_pull_result) = match pull_task_result {
379                        Ok(Ok(r)) if r.success => (OpResult::Success, Some(r)),
380                        Ok(Ok(r)) => (
381                            OpResult::Failed(
382                                r.error.clone().unwrap_or_else(|| "Pull failed".to_string()),
383                            ),
384                            Some(r),
385                        ),
386                        Ok(Err(e)) => (OpResult::Failed(e.to_string()), None),
387                        Err(e) => (OpResult::Failed(format!("Task panicked: {}", e)), None),
388                    };
389
390                    drop(permit);
391                    SyncResult {
392                        repo: local_repo.repo,
393                        path,
394                        result,
395                        had_updates,
396                        status,
397                        fetch_result: Some(fetch_result),
398                        pull_result: actual_pull_result,
399                    }
400                } else {
401                    drop(permit);
402                    SyncResult {
403                        repo: local_repo.repo,
404                        path,
405                        result: OpResult::Success,
406                        had_updates,
407                        status,
408                        fetch_result: Some(fetch_result),
409                        pull_result: None,
410                    }
411                }
412            });
413
414            handles.push(handle);
415        }
416
417        // Collect results
418        let mut summary = OpSummary::new();
419        let mut results = Vec::with_capacity(total);
420
421        for (index, handle) in handles.into_iter().enumerate() {
422            match handle.await {
423                Ok(sync_result) => {
424                    // Notify progress based on result using actual operation results
425                    match &sync_result.result {
426                        OpResult::Success => {
427                            if let Some(ref pull_result) = sync_result.pull_result {
428                                progress.on_pull_complete(
429                                    &sync_result.repo,
430                                    pull_result,
431                                    index,
432                                    total,
433                                );
434                            } else if let Some(ref fetch_result) = sync_result.fetch_result {
435                                progress.on_fetch_complete(
436                                    &sync_result.repo,
437                                    fetch_result,
438                                    index,
439                                    total,
440                                );
441                            }
442                        }
443                        OpResult::Failed(err) => {
444                            progress.on_error(&sync_result.repo, err, index, total);
445                        }
446                        OpResult::Skipped(reason) => {
447                            progress.on_skip(&sync_result.repo, reason, index, total);
448                        }
449                    }
450
451                    summary.record(&sync_result.result);
452                    results.push(sync_result);
453                }
454                Err(e) => {
455                    summary.record(&OpResult::Failed(format!("Task panicked: {}", e)));
456                }
457            }
458        }
459
460        (summary, results)
461    }
462
463    /// Syncs a single repository synchronously.
464    pub fn sync_single(&self, local_repo: &LocalRepo) -> SyncResult {
465        let path = &local_repo.path;
466
467        // Check if path exists
468        if !path.exists() {
469            return SyncResult {
470                repo: local_repo.repo.clone(),
471                path: path.clone(),
472                result: OpResult::Skipped("path does not exist".to_string()),
473                had_updates: false,
474                status: None,
475                fetch_result: None,
476                pull_result: None,
477            };
478        }
479        if !self.git.is_repo(path) {
480            return SyncResult {
481                repo: local_repo.repo.clone(),
482                path: path.clone(),
483                result: OpResult::Skipped("not a git repository".to_string()),
484                had_updates: false,
485                status: None,
486                fetch_result: None,
487                pull_result: None,
488            };
489        }
490
491        // Get status
492        let status = match self.git.status(path) {
493            Ok(status) => Some(status),
494            Err(e) if self.options.skip_uncommitted => {
495                return SyncResult {
496                    repo: local_repo.repo.clone(),
497                    path: path.clone(),
498                    result: OpResult::Skipped(format!("failed to get status: {}", e)),
499                    had_updates: false,
500                    status: None,
501                    fetch_result: None,
502                    pull_result: None,
503                };
504            }
505            Err(_) => None,
506        };
507
508        // Check if uncommitted
509        if self.options.skip_uncommitted {
510            if let Some(ref s) = status {
511                if s.is_uncommitted || s.has_untracked {
512                    return SyncResult {
513                        repo: local_repo.repo.clone(),
514                        path: path.clone(),
515                        result: OpResult::Skipped("uncommitted changes".to_string()),
516                        had_updates: false,
517                        status,
518                        fetch_result: None,
519                        pull_result: None,
520                    };
521                }
522            }
523        }
524
525        // Dry run
526        if self.options.dry_run {
527            return SyncResult {
528                repo: local_repo.repo.clone(),
529                path: path.clone(),
530                result: OpResult::Skipped("dry run".to_string()),
531                had_updates: false,
532                status,
533                fetch_result: None,
534                pull_result: None,
535            };
536        }
537
538        // Fetch
539        let fetch_result = match self.git.fetch(path) {
540            Ok(r) => r,
541            Err(e) => {
542                return SyncResult {
543                    repo: local_repo.repo.clone(),
544                    path: path.clone(),
545                    result: OpResult::Failed(e.to_string()),
546                    had_updates: false,
547                    status,
548                    fetch_result: None,
549                    pull_result: None,
550                };
551            }
552        };
553
554        let had_updates = fetch_result.updated;
555
556        // Pull if needed
557        if self.options.mode == SyncMode::Pull && had_updates {
558            match self.git.pull(path) {
559                Ok(r) if r.success => SyncResult {
560                    repo: local_repo.repo.clone(),
561                    path: path.clone(),
562                    result: OpResult::Success,
563                    had_updates,
564                    status,
565                    fetch_result: Some(fetch_result),
566                    pull_result: Some(r),
567                },
568                Ok(r) => SyncResult {
569                    repo: local_repo.repo.clone(),
570                    path: path.clone(),
571                    result: OpResult::Failed(
572                        r.error.clone().unwrap_or_else(|| "Pull failed".to_string()),
573                    ),
574                    had_updates,
575                    status,
576                    fetch_result: Some(fetch_result),
577                    pull_result: Some(r),
578                },
579                Err(e) => SyncResult {
580                    repo: local_repo.repo.clone(),
581                    path: path.clone(),
582                    result: OpResult::Failed(e.to_string()),
583                    had_updates,
584                    status,
585                    fetch_result: Some(fetch_result),
586                    pull_result: None,
587                },
588            }
589        } else {
590            SyncResult {
591                repo: local_repo.repo.clone(),
592                path: path.clone(),
593                result: OpResult::Success,
594                had_updates,
595                status,
596                fetch_result: Some(fetch_result),
597                pull_result: None,
598            }
599        }
600    }
601}
602
603#[cfg(test)]
604#[path = "sync_tests.rs"]
605mod tests;