Skip to main content

process_bulk

Function process_bulk 

Source
pub async fn process_bulk<I, D, T, F, Fut, P>(
    items: Vec<(I, D)>,
    processor: F,
    progress_callback: P,
) -> BulkResult<I, T>
where I: Clone + Display + Send + 'static, D: Clone + Send + 'static, T: Send + 'static, F: Fn((I, D)) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Option<T>>> + Send, P: Fn(usize, usize, &str) + Send + Sync + 'static,
Expand description

Process a collection of items concurrently with retry logic and progress tracking.

§Type Parameters

  • I - Item identifier type (must be Clone + Display for progress messages)
  • D - Item data type (must be Clone + Send)
  • T - Result type for successful processing
  • F - Async processor function type
  • P - Progress callback function type

§Arguments

  • items - Collection of (identifier, data) pairs to process
  • processor - Async function that processes a single item, returning:
    • Ok(Some(T)) for successful processing
    • Ok(None) for skipped items
    • Err(e) for failures (will retry if retryable)
  • progress_callback - Called before processing each item with (current, total, action_message)

§Returns

A BulkResult containing counts and detailed outcomes for all items.

§Concurrency

Uses buffer_unordered(5) to process up to 5 items concurrently, respecting rate limits and avoiding overwhelming external APIs.

§Retry Logic

Automatically retries transient failures (network errors, rate limits) using exponential backoff. Non-retryable errors (validation, permissions) fail immediately.

§Example

use aptu_core::bulk::{process_bulk, BulkResult};
use anyhow::Result;

async fn process_item(id: &str) -> Result<Option<String>> {
    // Process the item...
    Ok(Some(format!("Processed {}", id)))
}

let items = vec![
    ("item1".to_string(), "data1"),
    ("item2".to_string(), "data2"),
];

let result = process_bulk(
    items,
    |(_id, data)| async move { process_item(data).await },
    |current, total, action| {
        println!("[{}/{}] {}", current, total, action);
    },
).await;

println!("Succeeded: {}, Failed: {}, Skipped: {}",
    result.succeeded, result.failed, result.skipped);