aptu_core/
bulk.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Generic bulk processing with concurrent execution and retry logic.
4//!
5//! This module provides a reusable pattern for processing collections of items
6//! concurrently with automatic retry on transient failures and progress tracking.
7//! It's designed to work across all platforms (CLI, iOS/FFI, MCP) without any
8//! CLI-specific dependencies.
9
10use std::fmt::Display;
11
12use anyhow::Result;
13use backon::Retryable;
14use futures::{StreamExt, stream};
15
16use crate::{is_retryable_anyhow, retry_backoff};
17
18/// Outcome of processing a single item in a bulk operation.
19#[derive(Debug, Clone)]
20pub enum BulkOutcome<T> {
21    /// Item was processed successfully with a result.
22    Success(T),
23    /// Item was skipped (e.g., already processed).
24    Skipped(String),
25    /// Item processing failed with an error.
26    Failed(String),
27}
28
29/// Result of a bulk processing operation.
30#[derive(Debug, Clone)]
31pub struct BulkResult<I, T> {
32    /// Number of items processed successfully.
33    pub succeeded: usize,
34    /// Number of items that failed processing.
35    pub failed: usize,
36    /// Number of items that were skipped.
37    pub skipped: usize,
38    /// Detailed outcomes for each item (identifier, outcome).
39    pub outcomes: Vec<(I, BulkOutcome<T>)>,
40}
41
42impl<I, T> Default for BulkResult<I, T> {
43    fn default() -> Self {
44        Self {
45            succeeded: 0,
46            failed: 0,
47            skipped: 0,
48            outcomes: Vec::new(),
49        }
50    }
51}
52
53/// Process a collection of items concurrently with retry logic and progress tracking.
54///
55/// # Type Parameters
56///
57/// * `I` - Item identifier type (must be Clone + Display for progress messages)
58/// * `D` - Item data type (must be Clone + Send)
59/// * `T` - Result type for successful processing
60/// * `F` - Async processor function type
61/// * `P` - Progress callback function type
62///
63/// # Arguments
64///
65/// * `items` - Collection of (identifier, data) pairs to process
66/// * `processor` - Async function that processes a single item, returning:
67///   - `Ok(Some(T))` for successful processing
68///   - `Ok(None)` for skipped items
69///   - `Err(e)` for failures (will retry if retryable)
70/// * `progress_callback` - Called before processing each item with (current, total, `action_message`)
71///
72/// # Returns
73///
74/// A `BulkResult` containing counts and detailed outcomes for all items.
75///
76/// # Concurrency
77///
78/// Uses `buffer_unordered(5)` to process up to 5 items concurrently, respecting
79/// rate limits and avoiding overwhelming external APIs.
80///
81/// # Retry Logic
82///
83/// Automatically retries transient failures (network errors, rate limits) using
84/// exponential backoff. Non-retryable errors (validation, permissions) fail immediately.
85///
86/// # Example
87///
88/// ```rust,no_run
89/// use aptu_core::bulk::{process_bulk, BulkResult};
90/// use anyhow::Result;
91///
92/// async fn process_item(id: &str) -> Result<Option<String>> {
93///     // Process the item...
94///     Ok(Some(format!("Processed {}", id)))
95/// }
96///
97/// # async fn example() -> Result<()> {
98/// let items = vec![
99///     ("item1".to_string(), "data1"),
100///     ("item2".to_string(), "data2"),
101/// ];
102///
103/// let result = process_bulk(
104///     items,
105///     |(_id, data)| async move { process_item(data).await },
106///     |current, total, action| {
107///         println!("[{}/{}] {}", current, total, action);
108///     },
109/// ).await;
110///
111/// println!("Succeeded: {}, Failed: {}, Skipped: {}",
112///     result.succeeded, result.failed, result.skipped);
113/// # Ok(())
114/// # }
115/// ```
116pub async fn process_bulk<I, D, T, F, Fut, P>(
117    items: Vec<(I, D)>,
118    processor: F,
119    progress_callback: P,
120) -> BulkResult<I, T>
121where
122    I: Clone + Display + Send + 'static,
123    D: Clone + Send + 'static,
124    T: Send + 'static,
125    F: Fn((I, D)) -> Fut + Send + Sync + 'static,
126    Fut: std::future::Future<Output = Result<Option<T>>> + Send,
127    P: Fn(usize, usize, &str) + Send + Sync + 'static,
128{
129    let total = items.len();
130    let progress_callback = std::sync::Arc::new(progress_callback);
131    let processor = std::sync::Arc::new(processor);
132
133    // Process items concurrently with buffer_unordered(5) for rate limit awareness
134    let mut tasks = Vec::new();
135    for (idx, (id, data)) in items.into_iter().enumerate() {
136        let id_clone = id.clone();
137        let data_clone = data.clone();
138        let progress_callback = progress_callback.clone();
139        let processor = processor.clone();
140
141        let task = async move {
142            // Call progress callback before processing
143            progress_callback(idx + 1, total, &format!("Processing {id}"));
144
145            // Process with retry logic
146            let id_for_retry = id_clone.clone();
147            let data_for_retry = data_clone.clone();
148            let result = (|| {
149                let processor = processor.clone();
150                let id = id_for_retry.clone();
151                let data = data_for_retry.clone();
152                async move { processor((id, data)).await }
153            })
154            .retry(retry_backoff())
155            .when(is_retryable_anyhow)
156            .notify(|err, dur| {
157                tracing::warn!(
158                    error = %err,
159                    delay_ms = dur.as_millis(),
160                    item = %id_clone,
161                    "Retrying after transient failure"
162                );
163            })
164            .await;
165
166            (id_clone, result)
167        };
168
169        tasks.push(task);
170    }
171
172    let outcomes = stream::iter(tasks)
173        .buffer_unordered(5)
174        .collect::<Vec<_>>()
175        .await;
176
177    // Categorize outcomes and build result
178    let mut bulk_result = BulkResult::default();
179
180    for (id, result) in outcomes {
181        match result {
182            Ok(Some(value)) => {
183                bulk_result.succeeded += 1;
184                bulk_result.outcomes.push((id, BulkOutcome::Success(value)));
185            }
186            Ok(None) => {
187                bulk_result.skipped += 1;
188                bulk_result
189                    .outcomes
190                    .push((id, BulkOutcome::Skipped("Skipped".to_string())));
191            }
192            Err(e) => {
193                bulk_result.failed += 1;
194                bulk_result
195                    .outcomes
196                    .push((id, BulkOutcome::Failed(e.to_string())));
197            }
198        }
199    }
200
201    bulk_result
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[tokio::test]
209    async fn test_successful_processing() {
210        let items = vec![
211            ("item1".to_string(), 1),
212            ("item2".to_string(), 2),
213            ("item3".to_string(), 3),
214        ];
215
216        let result = process_bulk(
217            items,
218            |(id, value)| async move { Ok(Some(format!("{}: {}", id, value * 2))) },
219            |_current, _total, _action| {},
220        )
221        .await;
222
223        assert_eq!(result.succeeded, 3);
224        assert_eq!(result.failed, 0);
225        assert_eq!(result.skipped, 0);
226        assert_eq!(result.outcomes.len(), 3);
227    }
228
229    #[tokio::test]
230    async fn test_mixed_outcomes() {
231        let items = vec![
232            ("success".to_string(), 1),
233            ("skip".to_string(), 2),
234            ("fail".to_string(), 3),
235        ];
236
237        let result = process_bulk(
238            items,
239            |(id, _value)| async move {
240                match id.as_str() {
241                    "success" => Ok(Some("done".to_string())),
242                    "skip" => Ok(None),
243                    "fail" => Err(anyhow::anyhow!("Processing failed")),
244                    _ => unreachable!(),
245                }
246            },
247            |_current, _total, _action| {},
248        )
249        .await;
250
251        assert_eq!(result.succeeded, 1);
252        assert_eq!(result.failed, 1);
253        assert_eq!(result.skipped, 1);
254        assert_eq!(result.outcomes.len(), 3);
255    }
256
257    #[tokio::test]
258    async fn test_progress_callback_invocation() {
259        use std::sync::{Arc, Mutex};
260
261        let items = vec![("item1".to_string(), 1), ("item2".to_string(), 2)];
262
263        let progress_calls = Arc::new(Mutex::new(Vec::new()));
264        let progress_calls_clone = progress_calls.clone();
265
266        let _result = process_bulk(
267            items,
268            |(_id, _value)| async move { Ok(Some("done".to_string())) },
269            move |current, total, action| {
270                progress_calls_clone
271                    .lock()
272                    .unwrap()
273                    .push((current, total, action.to_string()));
274            },
275        )
276        .await;
277
278        let calls = progress_calls.lock().unwrap();
279        assert_eq!(calls.len(), 2);
280        assert_eq!(calls[0].0, 1);
281        assert_eq!(calls[0].1, 2);
282        assert_eq!(calls[1].0, 2);
283        assert_eq!(calls[1].1, 2);
284    }
285}