Skip to main content

raps_admin/operations/
add_user.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2025 Dmytro Yemelianov
3
4//! Bulk add user operation
5
6use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use uuid::Uuid;
10
11use raps_acc::admin::AccountAdminClient;
12use raps_acc::types::ProductAccess;
13use raps_acc::users::{AddProjectUserRequest, ProjectUsersClient};
14
15use crate::bulk::executor::{
16    BulkConfig, BulkExecutor, BulkOperationResult, ItemResult, ProcessItem, ProgressUpdate,
17};
18use crate::bulk::state::{StateManager, StateUpdate};
19use crate::error::AdminError;
20use crate::filter::ProjectFilter;
21use crate::types::OperationType;
22
23/// Parameters for bulk add user operation
24#[derive(Debug, Clone)]
25pub struct BulkAddUserParams {
26    /// Account ID
27    pub account_id: String,
28    /// User email to add
29    pub user_email: String,
30    /// Role ID to assign (optional)
31    pub role_id: Option<String>,
32    /// Product access configurations
33    pub products: Vec<ProductAccess>,
34}
35
36/// Add a user to multiple projects in bulk
37///
38/// # Arguments
39/// * `admin_client` - Client for account admin API (user/project lookup)
40/// * `users_client` - Client for project users API (add user)
41/// * `account_id` - The account ID
42/// * `user_email` - Email of the user to add
43/// * `role_id` - Optional role ID to assign
44/// * `project_filter` - Filter for selecting target projects
45/// * `config` - Bulk execution configuration
46/// * `on_progress` - Progress callback
47///
48/// # Returns
49/// Result containing the bulk operation outcome
50#[allow(clippy::too_many_arguments)]
51pub async fn bulk_add_user<P>(
52    admin_client: &AccountAdminClient,
53    users_client: Arc<ProjectUsersClient>,
54    account_id: &str,
55    user_email: &str,
56    role_id: Option<&str>,
57    project_filter: &ProjectFilter,
58    config: BulkConfig,
59    on_progress: P,
60) -> Result<BulkOperationResult>
61where
62    P: Fn(ProgressUpdate) + Send + Sync + 'static,
63{
64    // Step 1: Look up user by email to get their user ID
65    let user = admin_client
66        .find_user_by_email(account_id, user_email)
67        .await?
68        .ok_or_else(|| AdminError::UserNotFound {
69            email: user_email.to_string(),
70        })?;
71
72    let user_id = user.id.clone();
73
74    // Step 2: Get list of projects matching the filter
75    let all_projects = admin_client.list_all_projects(account_id).await?;
76    let filtered_projects = project_filter.apply(all_projects);
77
78    if filtered_projects.is_empty() {
79        return Ok(BulkOperationResult {
80            operation_id: Uuid::new_v4(),
81            total: 0,
82            completed: 0,
83            failed: 0,
84            skipped: 0,
85            duration: std::time::Duration::from_secs(0),
86            details: vec![],
87        });
88    }
89
90    // Step 3: Create operation state for resumability
91    let state_manager = StateManager::new()?;
92    let project_ids: Vec<String> = filtered_projects.iter().map(|p| p.id.clone()).collect();
93
94    let params = serde_json::json!({
95        "account_id": account_id,
96        "user_email": user_email,
97        "user_id": user_id,
98        "role_id": role_id,
99    });
100
101    let operation_id = state_manager
102        .create_operation(OperationType::AddUser, params, project_ids)
103        .await?;
104
105    // Mark operation as in progress
106    state_manager
107        .update_state(
108            operation_id,
109            StateUpdate::StatusChanged {
110                status: crate::types::OperationStatus::InProgress,
111            },
112        )
113        .await?;
114
115    // Step 4: Prepare items for processing
116    let items: Vec<ProcessItem> = filtered_projects
117        .into_iter()
118        .map(|p| ProcessItem {
119            project_id: p.id,
120            project_name: Some(p.name),
121        })
122        .collect();
123
124    // Step 5: Create the processor closure
125    let user_id_clone = user_id.clone();
126    let role_id_clone = role_id.map(|s| s.to_string());
127    let users_client_clone = Arc::clone(&users_client);
128
129    let processor = move |project_id: String| {
130        let user_id = user_id_clone.clone();
131        let role_id = role_id_clone.clone();
132        let users_client = Arc::clone(&users_client_clone);
133
134        async move {
135            add_user_to_project(&users_client, &project_id, &user_id, role_id.as_deref()).await
136        }
137    };
138
139    // Step 6: Execute bulk operation
140    let executor = BulkExecutor::new(config);
141    let result = executor
142        .execute(operation_id, items, processor, on_progress)
143        .await;
144
145    // Step 7: Update final operation status
146    let final_status = if result.failed > 0 {
147        crate::types::OperationStatus::Failed
148    } else {
149        crate::types::OperationStatus::Completed
150    };
151
152    state_manager
153        .complete_operation(operation_id, final_status)
154        .await?;
155
156    Ok(result)
157}
158
159/// Add a single user to a single project with duplicate detection
160async fn add_user_to_project(
161    users_client: &ProjectUsersClient,
162    project_id: &str,
163    user_id: &str,
164    role_id: Option<&str>,
165) -> ItemResult {
166    // Check if user already exists in the project
167    match users_client.user_exists(project_id, user_id).await {
168        Ok(true) => {
169            return ItemResult::Skipped {
170                reason: "already_exists".to_string(),
171            };
172        }
173        Ok(false) => {
174            // User doesn't exist, proceed to add
175        }
176        Err(e) => {
177            // Error checking existence - treat as retryable
178            return ItemResult::Failed {
179                error: format!("Failed to check user existence: {}", e),
180                retryable: true,
181            };
182        }
183    }
184
185    // Add the user to the project
186    let request = AddProjectUserRequest {
187        email: user_id.to_string(),
188        role_id: role_id.map(|s| s.to_string()),
189        products: vec![], // Default product access
190    };
191
192    match users_client.add_user(project_id, request).await {
193        Ok(_) => ItemResult::Success,
194        Err(e) => {
195            let error_str = e.to_string();
196            // Check if it's a rate limit or temporary error
197            let retryable = is_retryable_error(&error_str);
198            ItemResult::Failed {
199                error: error_str,
200                retryable,
201            }
202        }
203    }
204}
205
206/// Check if an error is retryable
207fn is_retryable_error(error: &str) -> bool {
208    let lower = error.to_lowercase();
209    lower.contains("429")
210        || lower.contains("rate limit")
211        || lower.contains("too many requests")
212        || lower.contains("503")
213        || lower.contains("service unavailable")
214        || lower.contains("502")
215        || lower.contains("bad gateway")
216        || lower.contains("timeout")
217        || lower.contains("connection")
218}
219
220/// Resume an interrupted bulk add user operation
221pub async fn resume_bulk_add_user<P>(
222    users_client: Arc<ProjectUsersClient>,
223    operation_id: Uuid,
224    config: BulkConfig,
225    on_progress: P,
226) -> Result<BulkOperationResult>
227where
228    P: Fn(ProgressUpdate) + Send + Sync + 'static,
229{
230    let state_manager = StateManager::new()?;
231    let state = state_manager.load_operation(operation_id).await?;
232
233    // Get the user ID from saved parameters
234    let user_id = state.parameters["user_id"]
235        .as_str()
236        .context("Missing user_id in operation parameters")?
237        .to_string();
238
239    let role_id = state.parameters["role_id"].as_str().map(|s| s.to_string());
240
241    // Get pending projects (not yet processed)
242    let pending_project_ids = state_manager.get_pending_projects(&state);
243
244    if pending_project_ids.is_empty() {
245        // All projects already processed
246        return Ok(BulkOperationResult {
247            operation_id,
248            total: state.project_ids.len(),
249            completed: state
250                .results
251                .values()
252                .filter(|r| matches!(r.result, ItemResult::Success))
253                .count(),
254            failed: state
255                .results
256                .values()
257                .filter(|r| matches!(r.result, ItemResult::Failed { .. }))
258                .count(),
259            skipped: state
260                .results
261                .values()
262                .filter(|r| matches!(r.result, ItemResult::Skipped { .. }))
263                .count(),
264            duration: std::time::Duration::from_secs(0),
265            details: vec![],
266        });
267    }
268
269    // Mark operation as in progress again
270    state_manager
271        .update_state(
272            operation_id,
273            StateUpdate::StatusChanged {
274                status: crate::types::OperationStatus::InProgress,
275            },
276        )
277        .await?;
278
279    // Prepare items for processing
280    let items: Vec<ProcessItem> = pending_project_ids
281        .into_iter()
282        .map(|id| ProcessItem {
283            project_id: id,
284            project_name: None,
285        })
286        .collect();
287
288    // Create the processor closure
289    let users_client_clone = Arc::clone(&users_client);
290
291    let processor = move |project_id: String| {
292        let user_id = user_id.clone();
293        let role_id = role_id.clone();
294        let users_client = Arc::clone(&users_client_clone);
295
296        async move {
297            add_user_to_project(&users_client, &project_id, &user_id, role_id.as_deref()).await
298        }
299    };
300
301    // Execute bulk operation
302    let executor = BulkExecutor::new(config);
303    let result = executor
304        .execute(operation_id, items, processor, on_progress)
305        .await;
306
307    // Update final operation status
308    let final_status = if result.failed > 0 {
309        crate::types::OperationStatus::Failed
310    } else {
311        crate::types::OperationStatus::Completed
312    };
313
314    state_manager
315        .complete_operation(operation_id, final_status)
316        .await?;
317
318    Ok(result)
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn test_is_retryable_error() {
327        assert!(is_retryable_error("429 Too Many Requests"));
328        assert!(is_retryable_error("Rate limit exceeded"));
329        assert!(is_retryable_error("503 Service Unavailable"));
330        assert!(is_retryable_error("Connection timeout"));
331        assert!(!is_retryable_error("404 Not Found"));
332        assert!(!is_retryable_error("400 Bad Request"));
333    }
334}