1use crate::auth::{get_auth_for_provider, AuthResult};
4use crate::cache::{CacheManager, DiscoveryCache};
5use crate::config::{Config, WorkspaceConfig};
6use crate::discovery::DiscoveryOrchestrator;
7use crate::errors::{AppError, Result};
8use crate::git::{CloneOptions, ShellGit};
9use crate::operations::clone::{
10 CloneManager, CloneManagerOptions, CloneProgress, MAX_CONCURRENCY, MIN_CONCURRENCY,
11};
12use crate::operations::sync::{
13 LocalRepo, SyncManager, SyncManagerOptions, SyncMode, SyncProgress, SyncResult,
14};
15use crate::provider::{create_provider, DiscoveryProgress};
16use crate::types::{ActionPlan, OpSummary, OwnedRepo};
17use std::collections::{HashMap, HashSet};
18use std::path::PathBuf;
19use std::sync::Arc;
20use tracing::warn;
21
22pub struct SyncWorkspaceRequest<'a> {
24 pub config: &'a Config,
25 pub workspace: &'a WorkspaceConfig,
26 pub refresh: bool,
27 pub skip_uncommitted: bool,
28 pub pull: bool,
29 pub concurrency_override: Option<usize>,
30 pub create_base_path: bool,
31}
32
33pub struct PreparedSyncWorkspace {
35 pub workspace: WorkspaceConfig,
36 pub auth: AuthResult,
37 pub repos: Vec<OwnedRepo>,
38 pub used_cache: bool,
39 pub cache_age_secs: Option<u64>,
40 pub base_path: PathBuf,
41 pub structure: String,
42 pub provider_name: String,
43 pub provider_prefer_ssh: bool,
44 pub skip_uncommitted: bool,
45 pub sync_mode: SyncMode,
46 pub requested_concurrency: usize,
47 pub effective_concurrency: usize,
48 pub plan: ActionPlan,
49 pub to_sync: Vec<LocalRepo>,
50 pub skipped_sync: Vec<(OwnedRepo, String)>,
51 pub clone_options: CloneOptions,
52}
53
54pub struct SyncExecutionOutcome {
56 pub clone_summary: Option<OpSummary>,
57 pub sync_summary: Option<OpSummary>,
58 pub sync_results: Vec<SyncResult>,
59}
60
61pub async fn prepare_sync_workspace(
63 request: SyncWorkspaceRequest<'_>,
64 discovery_progress: &dyn DiscoveryProgress,
65) -> Result<PreparedSyncWorkspace> {
66 let auth = get_auth_for_provider(&request.workspace.provider)?;
68 let provider = create_provider(&request.workspace.provider, &auth.token)?;
69
70 let mut filters = request.workspace.filters.clone();
72 if !request.workspace.orgs.is_empty() {
73 filters.orgs = request.workspace.orgs.clone();
74 }
75 filters.exclude_repos = request.workspace.exclude_repos.clone();
76
77 let structure = request
78 .workspace
79 .structure
80 .clone()
81 .unwrap_or_else(|| request.config.structure.clone());
82 let orchestrator = DiscoveryOrchestrator::new(filters, structure.clone());
83
84 let mut repos = Vec::new();
86 let mut used_cache = false;
87 let mut cache_age_secs = None;
88
89 if !request.refresh {
90 if let Ok(cache_manager) = CacheManager::for_workspace(&request.workspace.root_path) {
91 if let Ok(Some(cache)) = cache_manager.load() {
92 let discovery_options = orchestrator.to_discovery_options();
93 used_cache = true;
94 cache_age_secs = Some(cache.age_secs());
95 for provider_repos in cache.repos.values() {
96 repos.extend(
97 provider_repos
98 .iter()
99 .filter(|owned| {
100 discovery_options.should_include_org(&owned.owner)
101 && discovery_options.should_include(&owned.repo)
102 })
103 .cloned(),
104 );
105 }
106
107 let org_count = repos
110 .iter()
111 .map(|r| r.owner.clone())
112 .collect::<HashSet<_>>()
113 .len();
114 discovery_progress.on_orgs_discovered(org_count);
115 let mut by_org: HashMap<String, usize> = HashMap::new();
116 for repo in &repos {
117 *by_org.entry(repo.owner.clone()).or_insert(0) += 1;
118 }
119 for (org, count) in by_org {
120 discovery_progress.on_org_complete(&org, count);
121 }
122 }
123 }
124 }
125
126 if repos.is_empty() {
127 repos = orchestrator
128 .discover(provider.as_ref(), discovery_progress)
129 .await
130 .map_err(AppError::Provider)?;
131
132 if let Ok(cache_manager) = CacheManager::for_workspace(&request.workspace.root_path) {
133 let provider_label = request.workspace.provider.kind.slug().to_string();
134 let mut repos_by_provider = HashMap::new();
135 repos_by_provider.insert(provider_label, repos.clone());
136 let cache =
137 DiscoveryCache::new(auth.username.clone().unwrap_or_default(), repos_by_provider);
138 if let Err(e) = cache_manager.save(&cache) {
139 warn!(
140 workspace = %request.workspace.root_path.display(),
141 error = %e,
142 "Failed to save discovery cache"
143 );
144 }
145 }
146 }
147
148 let base_path = request.workspace.expanded_base_path();
149 if !base_path.exists() {
150 if request.create_base_path {
151 std::fs::create_dir_all(&base_path).map_err(|e| {
152 AppError::path(format!(
153 "Failed to create base directory '{}': {}",
154 base_path.display(),
155 e
156 ))
157 })?;
158 } else {
159 return Err(AppError::config(format!(
160 "Base path does not exist: {}",
161 base_path.display()
162 )));
163 }
164 }
165
166 let provider_name = request.workspace.provider.kind.slug().to_string();
167 let git = ShellGit::new();
168 let plan = orchestrator.plan_clone(&base_path, repos.clone(), &provider_name, &git);
169 let (to_sync, skipped_sync) = orchestrator.plan_sync(
170 &base_path,
171 repos.clone(),
172 &provider_name,
173 &git,
174 request.skip_uncommitted,
175 );
176
177 let requested_concurrency = request
178 .concurrency_override
179 .or(request.workspace.concurrency)
180 .unwrap_or(request.config.concurrency);
181 let effective_concurrency = requested_concurrency.clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
182
183 let sync_mode = if request.pull {
184 SyncMode::Pull
185 } else {
186 match request
187 .workspace
188 .sync_mode
189 .unwrap_or(request.config.sync_mode)
190 {
191 crate::config::SyncMode::Pull => SyncMode::Pull,
192 crate::config::SyncMode::Fetch => SyncMode::Fetch,
193 }
194 };
195
196 let clone_options = CloneOptions {
197 depth: request
198 .workspace
199 .clone_options
200 .as_ref()
201 .map(|c| c.depth)
202 .unwrap_or(request.config.clone.depth),
203 branch: request
204 .workspace
205 .clone_options
206 .as_ref()
207 .and_then(|c| {
208 if c.branch.is_empty() {
209 None
210 } else {
211 Some(c.branch.clone())
212 }
213 })
214 .or_else(|| {
215 if request.config.clone.branch.is_empty() {
216 None
217 } else {
218 Some(request.config.clone.branch.clone())
219 }
220 }),
221 recurse_submodules: request
222 .workspace
223 .clone_options
224 .as_ref()
225 .map(|c| c.recurse_submodules)
226 .unwrap_or(request.config.clone.recurse_submodules),
227 };
228
229 Ok(PreparedSyncWorkspace {
230 workspace: request.workspace.clone(),
231 auth,
232 repos,
233 used_cache,
234 cache_age_secs,
235 base_path,
236 structure,
237 provider_name,
238 provider_prefer_ssh: request.workspace.provider.prefer_ssh,
239 skip_uncommitted: request.skip_uncommitted,
240 sync_mode,
241 requested_concurrency,
242 effective_concurrency,
243 plan,
244 to_sync,
245 skipped_sync,
246 clone_options,
247 })
248}
249
250pub async fn execute_prepared_sync(
252 prepared: &PreparedSyncWorkspace,
253 dry_run: bool,
254 clone_progress: Arc<dyn CloneProgress>,
255 sync_progress: Arc<dyn SyncProgress>,
256) -> SyncExecutionOutcome {
257 if dry_run {
258 return SyncExecutionOutcome {
259 clone_summary: None,
260 sync_summary: None,
261 sync_results: Vec::new(),
262 };
263 }
264
265 let mut clone_summary = None;
266 let mut sync_summary = None;
267 let mut sync_results = Vec::new();
268
269 if !prepared.plan.to_clone.is_empty() {
270 let clone_options = CloneManagerOptions::new()
271 .with_concurrency(prepared.effective_concurrency)
272 .with_clone_options(prepared.clone_options.clone())
273 .with_structure(prepared.structure.clone())
274 .with_ssh(prepared.provider_prefer_ssh);
275
276 let manager = CloneManager::new(ShellGit::new(), clone_options);
277 let (summary, _results) = manager
278 .clone_repos(
279 &prepared.base_path,
280 prepared.plan.to_clone.clone(),
281 &prepared.provider_name,
282 clone_progress,
283 )
284 .await;
285 clone_summary = Some(summary);
286 }
287
288 if !prepared.to_sync.is_empty() {
289 let sync_options = SyncManagerOptions::new()
290 .with_concurrency(prepared.effective_concurrency)
291 .with_mode(prepared.sync_mode)
292 .with_skip_uncommitted(prepared.skip_uncommitted);
293
294 let manager = SyncManager::new(ShellGit::new(), sync_options);
295 let (summary, results) = manager
296 .sync_repos(prepared.to_sync.clone(), sync_progress)
297 .await;
298 sync_summary = Some(summary);
299 sync_results = results;
300 }
301
302 SyncExecutionOutcome {
303 clone_summary,
304 sync_summary,
305 sync_results,
306 }
307}
308
309#[cfg(test)]
310#[path = "sync_workspace_tests.rs"]
311mod tests;