1use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use uuid::Uuid;
10
11use raps_acc::admin::AccountAdminClient;
12use raps_acc::types::ProductAccess;
13use raps_acc::users::{AddProjectUserRequest, ProjectUsersClient};
14
15use crate::bulk::executor::{
16 BulkConfig, BulkExecutor, BulkOperationResult, ItemResult, ProcessItem, ProgressUpdate,
17};
18use crate::bulk::state::{StateManager, StateUpdate};
19use crate::error::AdminError;
20use crate::filter::ProjectFilter;
21use crate::types::OperationType;
22
23#[derive(Debug, Clone)]
25pub struct BulkAddUserParams {
26 pub account_id: String,
28 pub user_email: String,
30 pub role_id: Option<String>,
32 pub products: Vec<ProductAccess>,
34}
35
36#[allow(clippy::too_many_arguments)]
51pub async fn bulk_add_user<P>(
52 admin_client: &AccountAdminClient,
53 users_client: Arc<ProjectUsersClient>,
54 account_id: &str,
55 user_email: &str,
56 role_id: Option<&str>,
57 project_filter: &ProjectFilter,
58 config: BulkConfig,
59 on_progress: P,
60) -> Result<BulkOperationResult>
61where
62 P: Fn(ProgressUpdate) + Send + Sync + 'static,
63{
64 let user = admin_client
66 .find_user_by_email(account_id, user_email)
67 .await?
68 .ok_or_else(|| AdminError::UserNotFound {
69 email: user_email.to_string(),
70 })?;
71
72 let user_id = user.id.clone();
73
74 let all_projects = admin_client.list_all_projects(account_id).await?;
76 let filtered_projects = project_filter.apply(all_projects);
77
78 if filtered_projects.is_empty() {
79 return Ok(BulkOperationResult {
80 operation_id: Uuid::new_v4(),
81 total: 0,
82 completed: 0,
83 failed: 0,
84 skipped: 0,
85 duration: std::time::Duration::from_secs(0),
86 details: vec![],
87 });
88 }
89
90 let state_manager = StateManager::new()?;
92 let project_ids: Vec<String> = filtered_projects.iter().map(|p| p.id.clone()).collect();
93
94 let params = serde_json::json!({
95 "account_id": account_id,
96 "user_email": user_email,
97 "user_id": user_id,
98 "role_id": role_id,
99 });
100
101 let operation_id = state_manager
102 .create_operation(OperationType::AddUser, params, project_ids)
103 .await?;
104
105 state_manager
107 .update_state(
108 operation_id,
109 StateUpdate::StatusChanged {
110 status: crate::types::OperationStatus::InProgress,
111 },
112 )
113 .await?;
114
115 let items: Vec<ProcessItem> = filtered_projects
117 .into_iter()
118 .map(|p| ProcessItem {
119 project_id: p.id,
120 project_name: Some(p.name),
121 })
122 .collect();
123
124 let user_id_clone = user_id.clone();
126 let role_id_clone = role_id.map(|s| s.to_string());
127 let users_client_clone = Arc::clone(&users_client);
128
129 let processor = move |project_id: String| {
130 let user_id = user_id_clone.clone();
131 let role_id = role_id_clone.clone();
132 let users_client = Arc::clone(&users_client_clone);
133
134 async move {
135 add_user_to_project(&users_client, &project_id, &user_id, role_id.as_deref()).await
136 }
137 };
138
139 let executor = BulkExecutor::new(config);
141 let result = executor
142 .execute(operation_id, items, processor, on_progress)
143 .await;
144
145 let final_status = if result.failed > 0 {
147 crate::types::OperationStatus::Failed
148 } else {
149 crate::types::OperationStatus::Completed
150 };
151
152 state_manager
153 .complete_operation(operation_id, final_status)
154 .await?;
155
156 Ok(result)
157}
158
159async fn add_user_to_project(
161 users_client: &ProjectUsersClient,
162 project_id: &str,
163 user_id: &str,
164 role_id: Option<&str>,
165) -> ItemResult {
166 match users_client.user_exists(project_id, user_id).await {
168 Ok(true) => {
169 return ItemResult::Skipped {
170 reason: "already_exists".to_string(),
171 };
172 }
173 Ok(false) => {
174 }
176 Err(e) => {
177 return ItemResult::Failed {
179 error: format!("Failed to check user existence: {}", e),
180 retryable: true,
181 };
182 }
183 }
184
185 let request = AddProjectUserRequest {
187 email: user_id.to_string(),
188 role_id: role_id.map(|s| s.to_string()),
189 products: vec![], };
191
192 match users_client.add_user(project_id, request).await {
193 Ok(_) => ItemResult::Success,
194 Err(e) => {
195 let error_str = e.to_string();
196 let retryable = is_retryable_error(&error_str);
198 ItemResult::Failed {
199 error: error_str,
200 retryable,
201 }
202 }
203 }
204}
205
206fn is_retryable_error(error: &str) -> bool {
208 let lower = error.to_lowercase();
209 lower.contains("429")
210 || lower.contains("rate limit")
211 || lower.contains("too many requests")
212 || lower.contains("503")
213 || lower.contains("service unavailable")
214 || lower.contains("502")
215 || lower.contains("bad gateway")
216 || lower.contains("timeout")
217 || lower.contains("connection")
218}
219
220pub async fn resume_bulk_add_user<P>(
222 users_client: Arc<ProjectUsersClient>,
223 operation_id: Uuid,
224 config: BulkConfig,
225 on_progress: P,
226) -> Result<BulkOperationResult>
227where
228 P: Fn(ProgressUpdate) + Send + Sync + 'static,
229{
230 let state_manager = StateManager::new()?;
231 let state = state_manager.load_operation(operation_id).await?;
232
233 let user_id = state.parameters["user_id"]
235 .as_str()
236 .context("Missing user_id in operation parameters")?
237 .to_string();
238
239 let role_id = state.parameters["role_id"].as_str().map(|s| s.to_string());
240
241 let pending_project_ids = state_manager.get_pending_projects(&state);
243
244 if pending_project_ids.is_empty() {
245 return Ok(BulkOperationResult {
247 operation_id,
248 total: state.project_ids.len(),
249 completed: state
250 .results
251 .values()
252 .filter(|r| matches!(r.result, ItemResult::Success))
253 .count(),
254 failed: state
255 .results
256 .values()
257 .filter(|r| matches!(r.result, ItemResult::Failed { .. }))
258 .count(),
259 skipped: state
260 .results
261 .values()
262 .filter(|r| matches!(r.result, ItemResult::Skipped { .. }))
263 .count(),
264 duration: std::time::Duration::from_secs(0),
265 details: vec![],
266 });
267 }
268
269 state_manager
271 .update_state(
272 operation_id,
273 StateUpdate::StatusChanged {
274 status: crate::types::OperationStatus::InProgress,
275 },
276 )
277 .await?;
278
279 let items: Vec<ProcessItem> = pending_project_ids
281 .into_iter()
282 .map(|id| ProcessItem {
283 project_id: id,
284 project_name: None,
285 })
286 .collect();
287
288 let users_client_clone = Arc::clone(&users_client);
290
291 let processor = move |project_id: String| {
292 let user_id = user_id.clone();
293 let role_id = role_id.clone();
294 let users_client = Arc::clone(&users_client_clone);
295
296 async move {
297 add_user_to_project(&users_client, &project_id, &user_id, role_id.as_deref()).await
298 }
299 };
300
301 let executor = BulkExecutor::new(config);
303 let result = executor
304 .execute(operation_id, items, processor, on_progress)
305 .await;
306
307 let final_status = if result.failed > 0 {
309 crate::types::OperationStatus::Failed
310 } else {
311 crate::types::OperationStatus::Completed
312 };
313
314 state_manager
315 .complete_operation(operation_id, final_status)
316 .await?;
317
318 Ok(result)
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn test_is_retryable_error() {
327 assert!(is_retryable_error("429 Too Many Requests"));
328 assert!(is_retryable_error("Rate limit exceeded"));
329 assert!(is_retryable_error("503 Service Unavailable"));
330 assert!(is_retryable_error("Connection timeout"));
331 assert!(!is_retryable_error("404 Not Found"));
332 assert!(!is_retryable_error("400 Bad Request"));
333 }
334}