Skip to main content

raps_admin/bulk/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2025 Dmytro Yemelianov
3
4//! Parallel execution engine for bulk operations
5
6use std::future::Future;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use serde::{Deserialize, Serialize};
12use tokio::sync::Semaphore;
13use uuid::Uuid;
14
15use crate::bulk::retry::exponential_backoff;
16
17/// Configuration for bulk execution
18#[derive(Debug, Clone)]
19pub struct BulkConfig {
20    /// Number of concurrent operations (default: 10)
21    pub concurrency: usize,
22    /// Maximum retry attempts per item (default: 5)
23    pub max_retries: usize,
24    /// Base delay for exponential backoff (default: 1s)
25    pub retry_base_delay: Duration,
26    /// Continue processing even if some items fail (default: true)
27    pub continue_on_error: bool,
28    /// Preview mode - don't execute actual API calls (default: false)
29    pub dry_run: bool,
30}
31
32impl Default for BulkConfig {
33    fn default() -> Self {
34        Self {
35            concurrency: 10,
36            max_retries: 5,
37            retry_base_delay: Duration::from_secs(1),
38            continue_on_error: true,
39            dry_run: false,
40        }
41    }
42}
43
44/// Progress update for callbacks
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ProgressUpdate {
47    /// Total number of items to process
48    pub total: usize,
49    /// Number of successfully completed items
50    pub completed: usize,
51    /// Number of failed items
52    pub failed: usize,
53    /// Number of skipped items
54    pub skipped: usize,
55    /// Current item being processed (for display)
56    pub current_item: Option<String>,
57    /// Estimated time remaining
58    #[serde(skip)]
59    pub estimated_remaining: Option<Duration>,
60}
61
62impl ProgressUpdate {
63    /// Calculate completion percentage
64    pub fn percentage(&self) -> f64 {
65        if self.total == 0 {
66            100.0
67        } else {
68            (self.completed + self.failed + self.skipped) as f64 / self.total as f64 * 100.0
69        }
70    }
71
72    /// Check if processing is complete
73    pub fn is_complete(&self) -> bool {
74        self.completed + self.failed + self.skipped >= self.total
75    }
76}
77
78/// Result of processing a single item
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub enum ItemResult {
81    /// Item processed successfully
82    Success,
83    /// Item was skipped (e.g., already exists)
84    Skipped { reason: String },
85    /// Item failed after all retries
86    Failed { error: String, retryable: bool },
87}
88
89/// Detail of a single item's processing result
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ItemDetail {
92    /// Project ID this result is for
93    pub project_id: String,
94    /// Project name (for display)
95    pub project_name: Option<String>,
96    /// Processing result
97    pub result: ItemResult,
98    /// Number of attempts made
99    pub attempts: u32,
100}
101
102/// Final result of a bulk operation
103#[derive(Debug)]
104pub struct BulkOperationResult {
105    /// Unique operation identifier
106    pub operation_id: Uuid,
107    /// Total items processed
108    pub total: usize,
109    /// Successfully completed items
110    pub completed: usize,
111    /// Failed items
112    pub failed: usize,
113    /// Skipped items
114    pub skipped: usize,
115    /// Total duration
116    pub duration: Duration,
117    /// Per-item details
118    pub details: Vec<ItemDetail>,
119}
120
121/// Item to be processed (with metadata)
122pub struct ProcessItem {
123    /// Project ID
124    pub project_id: String,
125    /// Project name (for display)
126    pub project_name: Option<String>,
127}
128
129/// Bulk operation executor
130///
131/// Orchestrates parallel execution of bulk operations with:
132/// - Configurable concurrency using semaphores
133/// - Retry logic with exponential backoff
134/// - Progress tracking and callbacks
135pub struct BulkExecutor {
136    config: BulkConfig,
137}
138
139impl BulkExecutor {
140    /// Create a new executor with the given configuration
141    pub fn new(config: BulkConfig) -> Self {
142        Self { config }
143    }
144
145    /// Get the configuration
146    pub fn config(&self) -> &BulkConfig {
147        &self.config
148    }
149
150    /// Execute a bulk operation with progress tracking
151    ///
152    /// # Arguments
153    /// * `operation_id` - Unique identifier for this operation
154    /// * `items` - List of items to process
155    /// * `processor` - Async function to process each item
156    /// * `on_progress` - Callback for progress updates
157    ///
158    /// # Type Parameters
159    /// * `F` - Processor function type
160    /// * `Fut` - Future returned by processor
161    pub async fn execute<F, Fut, P>(
162        &self,
163        operation_id: Uuid,
164        items: Vec<ProcessItem>,
165        processor: F,
166        on_progress: P,
167    ) -> BulkOperationResult
168    where
169        F: Fn(String) -> Fut + Send + Sync + 'static,
170        Fut: Future<Output = ItemResult> + Send,
171        P: Fn(ProgressUpdate) + Send + Sync + 'static,
172    {
173        let start_time = Instant::now();
174        let total = items.len();
175
176        // If dry-run, simulate success for all items
177        if self.config.dry_run {
178            let details: Vec<ItemDetail> = items
179                .into_iter()
180                .map(|item| ItemDetail {
181                    project_id: item.project_id,
182                    project_name: item.project_name,
183                    result: ItemResult::Skipped {
184                        reason: "dry-run mode".to_string(),
185                    },
186                    attempts: 0,
187                })
188                .collect();
189
190            on_progress(ProgressUpdate {
191                total,
192                completed: 0,
193                failed: 0,
194                skipped: total,
195                current_item: None,
196                estimated_remaining: None,
197            });
198
199            return BulkOperationResult {
200                operation_id,
201                total,
202                completed: 0,
203                failed: 0,
204                skipped: total,
205                duration: start_time.elapsed(),
206                details,
207            };
208        }
209
210        // Shared counters for progress tracking
211        let completed = Arc::new(AtomicUsize::new(0));
212        let failed = Arc::new(AtomicUsize::new(0));
213        let skipped = Arc::new(AtomicUsize::new(0));
214
215        // Semaphore for concurrency control
216        let semaphore = Arc::new(Semaphore::new(self.config.concurrency));
217
218        // Wrap processor and progress callback in Arc for sharing
219        let processor = Arc::new(processor);
220        let on_progress = Arc::new(on_progress);
221
222        // Process all items concurrently (limited by semaphore)
223        let mut handles = Vec::with_capacity(items.len());
224
225        for item in items {
226            let permit = semaphore
227                .clone()
228                .acquire_owned()
229                .await
230                .expect("semaphore closed unexpectedly");
231            let processor = Arc::clone(&processor);
232            let on_progress = Arc::clone(&on_progress);
233            let completed = Arc::clone(&completed);
234            let failed = Arc::clone(&failed);
235            let skipped = Arc::clone(&skipped);
236            let config = self.config.clone();
237
238            let handle = tokio::spawn(async move {
239                let result = process_with_retry(
240                    &item.project_id,
241                    &*processor,
242                    config.max_retries,
243                    config.retry_base_delay,
244                )
245                .await;
246
247                // Update counters
248                match &result.0 {
249                    ItemResult::Success => {
250                        completed.fetch_add(1, Ordering::SeqCst);
251                    }
252                    ItemResult::Failed { .. } => {
253                        failed.fetch_add(1, Ordering::SeqCst);
254                    }
255                    ItemResult::Skipped { .. } => {
256                        skipped.fetch_add(1, Ordering::SeqCst);
257                    }
258                }
259
260                // Send progress update
261                on_progress(ProgressUpdate {
262                    total,
263                    completed: completed.load(Ordering::SeqCst),
264                    failed: failed.load(Ordering::SeqCst),
265                    skipped: skipped.load(Ordering::SeqCst),
266                    current_item: Some(item.project_id.clone()),
267                    estimated_remaining: None,
268                });
269
270                drop(permit); // Release semaphore permit
271
272                ItemDetail {
273                    project_id: item.project_id,
274                    project_name: item.project_name,
275                    result: result.0,
276                    attempts: result.1,
277                }
278            });
279
280            handles.push(handle);
281        }
282
283        // Collect all results
284        let mut details = Vec::with_capacity(handles.len());
285        for handle in handles {
286            if let Ok(detail) = handle.await {
287                details.push(detail);
288            }
289        }
290
291        BulkOperationResult {
292            operation_id,
293            total,
294            completed: completed.load(Ordering::SeqCst),
295            failed: failed.load(Ordering::SeqCst),
296            skipped: skipped.load(Ordering::SeqCst),
297            duration: start_time.elapsed(),
298            details,
299        }
300    }
301}
302
303/// Process a single item with retry logic
304async fn process_with_retry<F, Fut>(
305    project_id: &str,
306    processor: &F,
307    max_retries: usize,
308    base_delay: Duration,
309) -> (ItemResult, u32)
310where
311    F: Fn(String) -> Fut + Send + Sync,
312    Fut: Future<Output = ItemResult> + Send,
313{
314    let max_delay = Duration::from_secs(60);
315    let mut attempts = 0u32;
316
317    loop {
318        attempts += 1;
319        let result = processor(project_id.to_string()).await;
320
321        match &result {
322            ItemResult::Success | ItemResult::Skipped { .. } => {
323                return (result, attempts);
324            }
325            ItemResult::Failed { retryable, .. } => {
326                if !retryable || attempts as usize >= max_retries {
327                    return (result, attempts);
328                }
329
330                // Wait before retry with exponential backoff
331                let delay = exponential_backoff(attempts - 1, base_delay, max_delay);
332                tokio::time::sleep(delay).await;
333            }
334        }
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use std::sync::atomic::AtomicU32;
342
343    #[tokio::test]
344    async fn test_execute_success() {
345        let executor = BulkExecutor::new(BulkConfig::default());
346        let operation_id = Uuid::new_v4();
347
348        let items = vec![
349            ProcessItem {
350                project_id: "proj-1".to_string(),
351                project_name: Some("Project 1".to_string()),
352            },
353            ProcessItem {
354                project_id: "proj-2".to_string(),
355                project_name: Some("Project 2".to_string()),
356            },
357        ];
358
359        let result = executor
360            .execute(
361                operation_id,
362                items,
363                |_project_id| async { ItemResult::Success },
364                |_progress| {},
365            )
366            .await;
367
368        assert_eq!(result.total, 2);
369        assert_eq!(result.completed, 2);
370        assert_eq!(result.failed, 0);
371        assert_eq!(result.skipped, 0);
372    }
373
374    #[tokio::test]
375    async fn test_execute_dry_run() {
376        let config = BulkConfig {
377            dry_run: true,
378            ..Default::default()
379        };
380        let executor = BulkExecutor::new(config);
381        let operation_id = Uuid::new_v4();
382
383        let items = vec![ProcessItem {
384            project_id: "proj-1".to_string(),
385            project_name: None,
386        }];
387
388        let result = executor
389            .execute(
390                operation_id,
391                items,
392                |_project_id| async { ItemResult::Success },
393                |_progress| {},
394            )
395            .await;
396
397        assert_eq!(result.total, 1);
398        assert_eq!(result.skipped, 1);
399        assert_eq!(result.completed, 0);
400    }
401
402    #[tokio::test]
403    async fn test_execute_with_retries() {
404        let config = BulkConfig {
405            max_retries: 3,
406            retry_base_delay: Duration::from_millis(10),
407            ..Default::default()
408        };
409        let executor = BulkExecutor::new(config);
410        let operation_id = Uuid::new_v4();
411
412        // Counter to track attempts
413        let attempts = Arc::new(AtomicU32::new(0));
414        let attempts_clone = Arc::clone(&attempts);
415
416        let items = vec![ProcessItem {
417            project_id: "proj-1".to_string(),
418            project_name: None,
419        }];
420
421        let result = executor
422            .execute(
423                operation_id,
424                items,
425                move |_project_id| {
426                    let attempts = Arc::clone(&attempts_clone);
427                    async move {
428                        let count = attempts.fetch_add(1, Ordering::SeqCst);
429                        if count < 2 {
430                            ItemResult::Failed {
431                                error: "temporary error".to_string(),
432                                retryable: true,
433                            }
434                        } else {
435                            ItemResult::Success
436                        }
437                    }
438                },
439                |_progress| {},
440            )
441            .await;
442
443        assert_eq!(result.completed, 1);
444        assert_eq!(result.details[0].attempts, 3); // Initial + 2 retries
445    }
446}