1use std::future::Future;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11use serde::{Deserialize, Serialize};
12use tokio::sync::Semaphore;
13use uuid::Uuid;
14
15use crate::bulk::retry::exponential_backoff;
16
17#[derive(Debug, Clone)]
19pub struct BulkConfig {
20 pub concurrency: usize,
22 pub max_retries: usize,
24 pub retry_base_delay: Duration,
26 pub continue_on_error: bool,
28 pub dry_run: bool,
30}
31
32impl Default for BulkConfig {
33 fn default() -> Self {
34 Self {
35 concurrency: 10,
36 max_retries: 5,
37 retry_base_delay: Duration::from_secs(1),
38 continue_on_error: true,
39 dry_run: false,
40 }
41 }
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct ProgressUpdate {
47 pub total: usize,
49 pub completed: usize,
51 pub failed: usize,
53 pub skipped: usize,
55 pub current_item: Option<String>,
57 #[serde(skip)]
59 pub estimated_remaining: Option<Duration>,
60}
61
62impl ProgressUpdate {
63 pub fn percentage(&self) -> f64 {
65 if self.total == 0 {
66 100.0
67 } else {
68 (self.completed + self.failed + self.skipped) as f64 / self.total as f64 * 100.0
69 }
70 }
71
72 pub fn is_complete(&self) -> bool {
74 self.completed + self.failed + self.skipped >= self.total
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub enum ItemResult {
81 Success,
83 Skipped { reason: String },
85 Failed { error: String, retryable: bool },
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ItemDetail {
92 pub project_id: String,
94 pub project_name: Option<String>,
96 pub result: ItemResult,
98 pub attempts: u32,
100}
101
102#[derive(Debug)]
104pub struct BulkOperationResult {
105 pub operation_id: Uuid,
107 pub total: usize,
109 pub completed: usize,
111 pub failed: usize,
113 pub skipped: usize,
115 pub duration: Duration,
117 pub details: Vec<ItemDetail>,
119}
120
121pub struct ProcessItem {
123 pub project_id: String,
125 pub project_name: Option<String>,
127}
128
129pub struct BulkExecutor {
136 config: BulkConfig,
137}
138
139impl BulkExecutor {
140 pub fn new(config: BulkConfig) -> Self {
142 Self { config }
143 }
144
145 pub fn config(&self) -> &BulkConfig {
147 &self.config
148 }
149
150 pub async fn execute<F, Fut, P>(
162 &self,
163 operation_id: Uuid,
164 items: Vec<ProcessItem>,
165 processor: F,
166 on_progress: P,
167 ) -> BulkOperationResult
168 where
169 F: Fn(String) -> Fut + Send + Sync + 'static,
170 Fut: Future<Output = ItemResult> + Send,
171 P: Fn(ProgressUpdate) + Send + Sync + 'static,
172 {
173 let start_time = Instant::now();
174 let total = items.len();
175
176 if self.config.dry_run {
178 let details: Vec<ItemDetail> = items
179 .into_iter()
180 .map(|item| ItemDetail {
181 project_id: item.project_id,
182 project_name: item.project_name,
183 result: ItemResult::Skipped {
184 reason: "dry-run mode".to_string(),
185 },
186 attempts: 0,
187 })
188 .collect();
189
190 on_progress(ProgressUpdate {
191 total,
192 completed: 0,
193 failed: 0,
194 skipped: total,
195 current_item: None,
196 estimated_remaining: None,
197 });
198
199 return BulkOperationResult {
200 operation_id,
201 total,
202 completed: 0,
203 failed: 0,
204 skipped: total,
205 duration: start_time.elapsed(),
206 details,
207 };
208 }
209
210 let completed = Arc::new(AtomicUsize::new(0));
212 let failed = Arc::new(AtomicUsize::new(0));
213 let skipped = Arc::new(AtomicUsize::new(0));
214
215 let semaphore = Arc::new(Semaphore::new(self.config.concurrency));
217
218 let processor = Arc::new(processor);
220 let on_progress = Arc::new(on_progress);
221
222 let mut handles = Vec::with_capacity(items.len());
224
225 for item in items {
226 let permit = semaphore
227 .clone()
228 .acquire_owned()
229 .await
230 .expect("semaphore closed unexpectedly");
231 let processor = Arc::clone(&processor);
232 let on_progress = Arc::clone(&on_progress);
233 let completed = Arc::clone(&completed);
234 let failed = Arc::clone(&failed);
235 let skipped = Arc::clone(&skipped);
236 let config = self.config.clone();
237
238 let handle = tokio::spawn(async move {
239 let result = process_with_retry(
240 &item.project_id,
241 &*processor,
242 config.max_retries,
243 config.retry_base_delay,
244 )
245 .await;
246
247 match &result.0 {
249 ItemResult::Success => {
250 completed.fetch_add(1, Ordering::SeqCst);
251 }
252 ItemResult::Failed { .. } => {
253 failed.fetch_add(1, Ordering::SeqCst);
254 }
255 ItemResult::Skipped { .. } => {
256 skipped.fetch_add(1, Ordering::SeqCst);
257 }
258 }
259
260 on_progress(ProgressUpdate {
262 total,
263 completed: completed.load(Ordering::SeqCst),
264 failed: failed.load(Ordering::SeqCst),
265 skipped: skipped.load(Ordering::SeqCst),
266 current_item: Some(item.project_id.clone()),
267 estimated_remaining: None,
268 });
269
270 drop(permit); ItemDetail {
273 project_id: item.project_id,
274 project_name: item.project_name,
275 result: result.0,
276 attempts: result.1,
277 }
278 });
279
280 handles.push(handle);
281 }
282
283 let mut details = Vec::with_capacity(handles.len());
285 for handle in handles {
286 if let Ok(detail) = handle.await {
287 details.push(detail);
288 }
289 }
290
291 BulkOperationResult {
292 operation_id,
293 total,
294 completed: completed.load(Ordering::SeqCst),
295 failed: failed.load(Ordering::SeqCst),
296 skipped: skipped.load(Ordering::SeqCst),
297 duration: start_time.elapsed(),
298 details,
299 }
300 }
301}
302
303async fn process_with_retry<F, Fut>(
305 project_id: &str,
306 processor: &F,
307 max_retries: usize,
308 base_delay: Duration,
309) -> (ItemResult, u32)
310where
311 F: Fn(String) -> Fut + Send + Sync,
312 Fut: Future<Output = ItemResult> + Send,
313{
314 let max_delay = Duration::from_secs(60);
315 let mut attempts = 0u32;
316
317 loop {
318 attempts += 1;
319 let result = processor(project_id.to_string()).await;
320
321 match &result {
322 ItemResult::Success | ItemResult::Skipped { .. } => {
323 return (result, attempts);
324 }
325 ItemResult::Failed { retryable, .. } => {
326 if !retryable || attempts as usize >= max_retries {
327 return (result, attempts);
328 }
329
330 let delay = exponential_backoff(attempts - 1, base_delay, max_delay);
332 tokio::time::sleep(delay).await;
333 }
334 }
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use std::sync::atomic::AtomicU32;
342
343 #[tokio::test]
344 async fn test_execute_success() {
345 let executor = BulkExecutor::new(BulkConfig::default());
346 let operation_id = Uuid::new_v4();
347
348 let items = vec![
349 ProcessItem {
350 project_id: "proj-1".to_string(),
351 project_name: Some("Project 1".to_string()),
352 },
353 ProcessItem {
354 project_id: "proj-2".to_string(),
355 project_name: Some("Project 2".to_string()),
356 },
357 ];
358
359 let result = executor
360 .execute(
361 operation_id,
362 items,
363 |_project_id| async { ItemResult::Success },
364 |_progress| {},
365 )
366 .await;
367
368 assert_eq!(result.total, 2);
369 assert_eq!(result.completed, 2);
370 assert_eq!(result.failed, 0);
371 assert_eq!(result.skipped, 0);
372 }
373
374 #[tokio::test]
375 async fn test_execute_dry_run() {
376 let config = BulkConfig {
377 dry_run: true,
378 ..Default::default()
379 };
380 let executor = BulkExecutor::new(config);
381 let operation_id = Uuid::new_v4();
382
383 let items = vec![ProcessItem {
384 project_id: "proj-1".to_string(),
385 project_name: None,
386 }];
387
388 let result = executor
389 .execute(
390 operation_id,
391 items,
392 |_project_id| async { ItemResult::Success },
393 |_progress| {},
394 )
395 .await;
396
397 assert_eq!(result.total, 1);
398 assert_eq!(result.skipped, 1);
399 assert_eq!(result.completed, 0);
400 }
401
402 #[tokio::test]
403 async fn test_execute_with_retries() {
404 let config = BulkConfig {
405 max_retries: 3,
406 retry_base_delay: Duration::from_millis(10),
407 ..Default::default()
408 };
409 let executor = BulkExecutor::new(config);
410 let operation_id = Uuid::new_v4();
411
412 let attempts = Arc::new(AtomicU32::new(0));
414 let attempts_clone = Arc::clone(&attempts);
415
416 let items = vec![ProcessItem {
417 project_id: "proj-1".to_string(),
418 project_name: None,
419 }];
420
421 let result = executor
422 .execute(
423 operation_id,
424 items,
425 move |_project_id| {
426 let attempts = Arc::clone(&attempts_clone);
427 async move {
428 let count = attempts.fetch_add(1, Ordering::SeqCst);
429 if count < 2 {
430 ItemResult::Failed {
431 error: "temporary error".to_string(),
432 retryable: true,
433 }
434 } else {
435 ItemResult::Success
436 }
437 }
438 },
439 |_progress| {},
440 )
441 .await;
442
443 assert_eq!(result.completed, 1);
444 assert_eq!(result.details[0].attempts, 3); }
446}