1use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use uuid::Uuid;
10
11use raps_acc::admin::AccountAdminClient;
12use raps_acc::users::{ProjectUsersClient, UpdateProjectUserRequest};
13
14use crate::bulk::executor::{
15 BulkConfig, BulkExecutor, BulkOperationResult, ItemResult, ProcessItem, ProgressUpdate,
16};
17use crate::bulk::state::{StateManager, StateUpdate};
18use crate::error::AdminError;
19use crate::filter::ProjectFilter;
20use crate::types::OperationType;
21
22#[derive(Debug, Clone)]
24pub struct BulkUpdateRoleParams {
25 pub account_id: String,
27 pub user_email: String,
29 pub new_role_id: String,
31 pub from_role_id: Option<String>,
33}
34
35#[allow(clippy::too_many_arguments)]
51pub async fn bulk_update_role<P>(
52 admin_client: &AccountAdminClient,
53 users_client: Arc<ProjectUsersClient>,
54 account_id: &str,
55 user_email: &str,
56 new_role_id: &str,
57 from_role_id: Option<&str>,
58 project_filter: &ProjectFilter,
59 config: BulkConfig,
60 on_progress: P,
61) -> Result<BulkOperationResult>
62where
63 P: Fn(ProgressUpdate) + Send + Sync + 'static,
64{
65 let user = admin_client
67 .find_user_by_email(account_id, user_email)
68 .await?
69 .ok_or_else(|| AdminError::UserNotFound {
70 email: user_email.to_string(),
71 })?;
72
73 let user_id = user.id.clone();
74
75 let all_projects = admin_client.list_all_projects(account_id).await?;
77 let filtered_projects = project_filter.apply(all_projects);
78
79 if filtered_projects.is_empty() {
80 return Ok(BulkOperationResult {
81 operation_id: Uuid::new_v4(),
82 total: 0,
83 completed: 0,
84 failed: 0,
85 skipped: 0,
86 duration: std::time::Duration::from_secs(0),
87 details: vec![],
88 });
89 }
90
91 let state_manager = StateManager::new()?;
93 let project_ids: Vec<String> = filtered_projects.iter().map(|p| p.id.clone()).collect();
94
95 let params = serde_json::json!({
96 "account_id": account_id,
97 "user_email": user_email,
98 "user_id": user_id,
99 "new_role_id": new_role_id,
100 "from_role_id": from_role_id,
101 });
102
103 let operation_id = state_manager
104 .create_operation(OperationType::UpdateRole, params, project_ids)
105 .await?;
106
107 state_manager
109 .update_state(
110 operation_id,
111 StateUpdate::StatusChanged {
112 status: crate::types::OperationStatus::InProgress,
113 },
114 )
115 .await?;
116
117 let items: Vec<ProcessItem> = filtered_projects
119 .into_iter()
120 .map(|p| ProcessItem {
121 project_id: p.id,
122 project_name: Some(p.name),
123 })
124 .collect();
125
126 let user_id_clone = user_id.clone();
128 let new_role_id_clone = new_role_id.to_string();
129 let from_role_id_clone = from_role_id.map(|s| s.to_string());
130 let users_client_clone = Arc::clone(&users_client);
131
132 let processor = move |project_id: String| {
133 let user_id = user_id_clone.clone();
134 let new_role_id = new_role_id_clone.clone();
135 let from_role_id = from_role_id_clone.clone();
136 let users_client = Arc::clone(&users_client_clone);
137
138 async move {
139 update_user_role(
140 &users_client,
141 &project_id,
142 &user_id,
143 &new_role_id,
144 from_role_id.as_deref(),
145 )
146 .await
147 }
148 };
149
150 let executor = BulkExecutor::new(config);
152 let result = executor
153 .execute(operation_id, items, processor, on_progress)
154 .await;
155
156 let final_status = if result.failed > 0 {
158 crate::types::OperationStatus::Failed
159 } else {
160 crate::types::OperationStatus::Completed
161 };
162
163 state_manager
164 .complete_operation(operation_id, final_status)
165 .await?;
166
167 Ok(result)
168}
169
170async fn update_user_role(
172 users_client: &ProjectUsersClient,
173 project_id: &str,
174 user_id: &str,
175 new_role_id: &str,
176 from_role_id: Option<&str>,
177) -> ItemResult {
178 let current_user = match users_client.get_project_user(project_id, user_id).await {
180 Ok(user) => user,
181 Err(e) => {
182 let error_str = e.to_string();
183 if error_str.contains("404") || error_str.contains("not found") {
185 return ItemResult::Skipped {
186 reason: "user_not_in_project".to_string(),
187 };
188 }
189 return ItemResult::Failed {
190 error: format!("Failed to get user: {}", error_str),
191 retryable: is_retryable_error(&error_str),
192 };
193 }
194 };
195
196 if let Some(from_role) = from_role_id {
198 let current_role = current_user.role_id.as_deref().unwrap_or("");
199 if current_role != from_role {
200 return ItemResult::Skipped {
201 reason: format!("role_mismatch: current={}", current_role),
202 };
203 }
204 }
205
206 if current_user.role_id.as_deref() == Some(new_role_id) {
208 return ItemResult::Skipped {
209 reason: "already_has_role".to_string(),
210 };
211 }
212
213 let request = UpdateProjectUserRequest {
215 role_id: Some(new_role_id.to_string()),
216 products: None,
217 };
218
219 match users_client.update_user(project_id, user_id, request).await {
220 Ok(_) => ItemResult::Success,
221 Err(e) => {
222 let error_str = e.to_string();
223 ItemResult::Failed {
224 error: error_str.clone(),
225 retryable: is_retryable_error(&error_str),
226 }
227 }
228 }
229}
230
231fn is_retryable_error(error: &str) -> bool {
233 let lower = error.to_lowercase();
234 lower.contains("429")
235 || lower.contains("rate limit")
236 || lower.contains("too many requests")
237 || lower.contains("503")
238 || lower.contains("service unavailable")
239 || lower.contains("502")
240 || lower.contains("bad gateway")
241 || lower.contains("timeout")
242 || lower.contains("connection")
243}
244
245pub async fn resume_bulk_update_role<P>(
247 users_client: Arc<ProjectUsersClient>,
248 operation_id: Uuid,
249 config: BulkConfig,
250 on_progress: P,
251) -> Result<BulkOperationResult>
252where
253 P: Fn(ProgressUpdate) + Send + Sync + 'static,
254{
255 let state_manager = StateManager::new()?;
256 let state = state_manager.load_operation(operation_id).await?;
257
258 let user_id = state.parameters["user_id"]
260 .as_str()
261 .context("Missing user_id in operation parameters")?
262 .to_string();
263
264 let new_role_id = state.parameters["new_role_id"]
265 .as_str()
266 .context("Missing new_role_id in operation parameters")?
267 .to_string();
268
269 let from_role_id = state.parameters["from_role_id"]
270 .as_str()
271 .map(|s| s.to_string());
272
273 let pending_project_ids = state_manager.get_pending_projects(&state);
275
276 if pending_project_ids.is_empty() {
277 return Ok(BulkOperationResult {
278 operation_id,
279 total: state.project_ids.len(),
280 completed: state
281 .results
282 .values()
283 .filter(|r| matches!(r.result, ItemResult::Success))
284 .count(),
285 failed: state
286 .results
287 .values()
288 .filter(|r| matches!(r.result, ItemResult::Failed { .. }))
289 .count(),
290 skipped: state
291 .results
292 .values()
293 .filter(|r| matches!(r.result, ItemResult::Skipped { .. }))
294 .count(),
295 duration: std::time::Duration::from_secs(0),
296 details: vec![],
297 });
298 }
299
300 state_manager
302 .update_state(
303 operation_id,
304 StateUpdate::StatusChanged {
305 status: crate::types::OperationStatus::InProgress,
306 },
307 )
308 .await?;
309
310 let items: Vec<ProcessItem> = pending_project_ids
312 .into_iter()
313 .map(|id| ProcessItem {
314 project_id: id,
315 project_name: None,
316 })
317 .collect();
318
319 let users_client_clone = Arc::clone(&users_client);
321
322 let processor = move |project_id: String| {
323 let user_id = user_id.clone();
324 let new_role_id = new_role_id.clone();
325 let from_role_id = from_role_id.clone();
326 let users_client = Arc::clone(&users_client_clone);
327
328 async move {
329 update_user_role(
330 &users_client,
331 &project_id,
332 &user_id,
333 &new_role_id,
334 from_role_id.as_deref(),
335 )
336 .await
337 }
338 };
339
340 let executor = BulkExecutor::new(config);
342 let result = executor
343 .execute(operation_id, items, processor, on_progress)
344 .await;
345
346 let final_status = if result.failed > 0 {
348 crate::types::OperationStatus::Failed
349 } else {
350 crate::types::OperationStatus::Completed
351 };
352
353 state_manager
354 .complete_operation(operation_id, final_status)
355 .await?;
356
357 Ok(result)
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 #[test]
365 fn test_is_retryable_error() {
366 assert!(is_retryable_error("429 Too Many Requests"));
367 assert!(is_retryable_error("Rate limit exceeded"));
368 assert!(is_retryable_error("503 Service Unavailable"));
369 assert!(!is_retryable_error("404 Not Found"));
370 assert!(!is_retryable_error("403 Forbidden"));
371 }
372}