ipfrs_storage/
batch.rs

1//! Batch operation utilities for efficient bulk processing
2//!
3//! This module provides utilities for performing batch operations on block stores
4//! with features like parallel processing, error handling, and progress tracking.
5
6use crate::traits::BlockStore;
7use ipfrs_core::{Block, Cid};
8use std::sync::Arc;
9use tokio::sync::Semaphore;
10
11/// Batch operation configuration
12#[derive(Debug, Clone)]
13pub struct BatchConfig {
14    /// Maximum concurrent operations
15    pub max_concurrency: usize,
16    /// Batch size for chunking operations
17    pub batch_size: usize,
18    /// Whether to stop on first error or continue
19    pub fail_fast: bool,
20}
21
22impl Default for BatchConfig {
23    fn default() -> Self {
24        Self {
25            max_concurrency: 10,
26            batch_size: 100,
27            fail_fast: false,
28        }
29    }
30}
31
32impl BatchConfig {
33    /// Create a new batch config with custom settings
34    pub fn new(max_concurrency: usize, batch_size: usize) -> Self {
35        Self {
36            max_concurrency,
37            batch_size,
38            fail_fast: false,
39        }
40    }
41
42    /// Set whether to fail fast on first error
43    pub fn with_fail_fast(mut self, fail_fast: bool) -> Self {
44        self.fail_fast = fail_fast;
45        self
46    }
47
48    /// Optimized for high throughput
49    pub fn high_throughput() -> Self {
50        Self {
51            max_concurrency: 50,
52            batch_size: 500,
53            fail_fast: false,
54        }
55    }
56
57    /// Optimized for low latency
58    pub fn low_latency() -> Self {
59        Self {
60            max_concurrency: 20,
61            batch_size: 50,
62            fail_fast: false,
63        }
64    }
65
66    /// Conservative settings for resource-constrained environments
67    pub fn conservative() -> Self {
68        Self {
69            max_concurrency: 5,
70            batch_size: 20,
71            fail_fast: false,
72        }
73    }
74}
75
76/// Result of a batch operation
77#[derive(Debug, Clone)]
78pub struct BatchResult<T> {
79    /// Successfully processed items
80    pub successful: Vec<T>,
81    /// Failed items with their errors
82    pub failed: Vec<(T, String)>,
83    /// Total number of items processed
84    pub total: usize,
85}
86
87impl<T> BatchResult<T> {
88    /// Create a new batch result
89    pub fn new() -> Self {
90        Self {
91            successful: Vec::new(),
92            failed: Vec::new(),
93            total: 0,
94        }
95    }
96
97    /// Check if all operations succeeded
98    pub fn is_success(&self) -> bool {
99        self.failed.is_empty()
100    }
101
102    /// Get success rate (0.0 to 1.0)
103    pub fn success_rate(&self) -> f64 {
104        if self.total == 0 {
105            1.0
106        } else {
107            self.successful.len() as f64 / self.total as f64
108        }
109    }
110
111    /// Get number of successful operations
112    pub fn success_count(&self) -> usize {
113        self.successful.len()
114    }
115
116    /// Get number of failed operations
117    pub fn failure_count(&self) -> usize {
118        self.failed.len()
119    }
120}
121
122impl<T> Default for BatchResult<T> {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128/// Batch put blocks with concurrency control
129///
130/// Puts multiple blocks efficiently with configurable parallelism.
131/// Returns a result indicating success/failure for each block.
132pub async fn batch_put<S: BlockStore + Send + Sync + 'static>(
133    store: Arc<S>,
134    blocks: Vec<Block>,
135    config: BatchConfig,
136) -> BatchResult<Cid> {
137    let mut result = BatchResult::new();
138    result.total = blocks.len();
139
140    let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
141    let mut handles = Vec::new();
142
143    for chunk in blocks.chunks(config.batch_size) {
144        for block in chunk {
145            let permit = semaphore.clone().acquire_owned().await.unwrap();
146            let block = block.clone();
147            let cid = *block.cid();
148            let store = store.clone();
149
150            let handle = tokio::spawn(async move {
151                let _permit = permit; // Hold permit until task completes
152                (cid, store.put(&block).await)
153            });
154
155            handles.push(handle);
156        }
157
158        // Wait for this chunk to complete
159        for handle in handles.drain(..) {
160            match handle.await {
161                Ok((cid, Ok(_))) => result.successful.push(cid),
162                Ok((cid, Err(e))) => {
163                    result.failed.push((cid, e.to_string()));
164                    if config.fail_fast {
165                        return result;
166                    }
167                }
168                Err(e) => {
169                    // Task panicked or was cancelled
170                    result
171                        .failed
172                        .push((Cid::default(), format!("Task error: {e}")));
173                }
174            }
175        }
176    }
177
178    result
179}
180
181/// Batch get blocks with concurrency control
182///
183/// Retrieves multiple blocks efficiently with configurable parallelism.
184pub async fn batch_get<S: BlockStore + Send + Sync + 'static>(
185    store: Arc<S>,
186    cids: Vec<Cid>,
187    config: BatchConfig,
188) -> BatchResult<Block> {
189    let mut result = BatchResult::new();
190    result.total = cids.len();
191
192    let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
193    let mut handles = Vec::new();
194
195    for chunk in cids.chunks(config.batch_size) {
196        for cid in chunk {
197            let permit = semaphore.clone().acquire_owned().await.unwrap();
198            let cid = *cid;
199            let store = store.clone();
200
201            let handle = tokio::spawn(async move {
202                let _permit = permit;
203                (cid, store.get(&cid).await)
204            });
205
206            handles.push(handle);
207        }
208
209        // Wait for this chunk to complete
210        for handle in handles.drain(..) {
211            match handle.await {
212                Ok((_cid, Ok(Some(block)))) => result.successful.push(block),
213                Ok((cid, Ok(None))) => {
214                    result.failed.push((
215                        Block::from_parts(cid, bytes::Bytes::new()),
216                        "Block not found".to_string(),
217                    ));
218                }
219                Ok((cid, Err(e))) => {
220                    result
221                        .failed
222                        .push((Block::from_parts(cid, bytes::Bytes::new()), e.to_string()));
223                    if config.fail_fast {
224                        return result;
225                    }
226                }
227                Err(e) => {
228                    result.failed.push((
229                        Block::from_parts(Cid::default(), bytes::Bytes::new()),
230                        format!("Task error: {e}"),
231                    ));
232                }
233            }
234        }
235    }
236
237    result
238}
239
240/// Batch delete blocks with concurrency control
241pub async fn batch_delete<S: BlockStore + Send + Sync + 'static>(
242    store: Arc<S>,
243    cids: Vec<Cid>,
244    config: BatchConfig,
245) -> BatchResult<Cid> {
246    let mut result = BatchResult::new();
247    result.total = cids.len();
248
249    let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
250    let mut handles = Vec::new();
251
252    for chunk in cids.chunks(config.batch_size) {
253        for cid in chunk {
254            let permit = semaphore.clone().acquire_owned().await.unwrap();
255            let cid = *cid;
256            let store = store.clone();
257
258            let handle = tokio::spawn(async move {
259                let _permit = permit;
260                (cid, store.delete(&cid).await)
261            });
262
263            handles.push(handle);
264        }
265
266        // Wait for this chunk to complete
267        for handle in handles.drain(..) {
268            match handle.await {
269                Ok((cid, Ok(_))) => result.successful.push(cid),
270                Ok((cid, Err(e))) => {
271                    result.failed.push((cid, e.to_string()));
272                    if config.fail_fast {
273                        return result;
274                    }
275                }
276                Err(e) => {
277                    result
278                        .failed
279                        .push((Cid::default(), format!("Task error: {e}")));
280                }
281            }
282        }
283    }
284
285    result
286}
287
288/// Batch check existence with concurrency control
289pub async fn batch_has<S: BlockStore + Send + Sync + 'static>(
290    store: Arc<S>,
291    cids: Vec<Cid>,
292    config: BatchConfig,
293) -> BatchResult<(Cid, bool)> {
294    let mut result = BatchResult::new();
295    result.total = cids.len();
296
297    let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
298    let mut handles = Vec::new();
299
300    for chunk in cids.chunks(config.batch_size) {
301        for cid in chunk {
302            let permit = semaphore.clone().acquire_owned().await.unwrap();
303            let cid = *cid;
304            let store = store.clone();
305
306            let handle = tokio::spawn(async move {
307                let _permit = permit;
308                (cid, store.has(&cid).await)
309            });
310
311            handles.push(handle);
312        }
313
314        // Wait for this chunk to complete
315        for handle in handles.drain(..) {
316            match handle.await {
317                Ok((cid, Ok(exists))) => result.successful.push((cid, exists)),
318                Ok((cid, Err(e))) => {
319                    result.failed.push(((cid, false), e.to_string()));
320                    if config.fail_fast {
321                        return result;
322                    }
323                }
324                Err(e) => {
325                    result
326                        .failed
327                        .push(((Cid::default(), false), format!("Task error: {e}")));
328                }
329            }
330        }
331    }
332
333    result
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use crate::MemoryBlockStore;
340    use bytes::Bytes;
341
342    #[tokio::test]
343    async fn test_batch_put() {
344        let store = Arc::new(MemoryBlockStore::new());
345        let mut blocks = Vec::new();
346
347        for i in 0..10 {
348            let data = format!("block {}", i);
349            let block = Block::new(Bytes::from(data)).unwrap();
350            blocks.push(block);
351        }
352
353        let config = BatchConfig::default();
354        let result = batch_put(store.clone(), blocks.clone(), config).await;
355
356        assert!(result.is_success());
357        assert_eq!(result.success_count(), 10);
358        assert_eq!(result.failure_count(), 0);
359        assert_eq!(result.success_rate(), 1.0);
360    }
361
362    #[tokio::test]
363    async fn test_batch_get() {
364        let store = Arc::new(MemoryBlockStore::new());
365        let mut blocks = Vec::new();
366        let mut cids = Vec::new();
367
368        for i in 0..5 {
369            let data = format!("block {}", i);
370            let block = Block::new(Bytes::from(data)).unwrap();
371            cids.push(*block.cid());
372            store.put(&block).await.unwrap();
373            blocks.push(block);
374        }
375
376        let config = BatchConfig::default();
377        let result = batch_get(store.clone(), cids, config).await;
378
379        assert!(result.is_success());
380        assert_eq!(result.success_count(), 5);
381    }
382
383    #[tokio::test]
384    async fn test_batch_has() {
385        let store = Arc::new(MemoryBlockStore::new());
386        let mut cids = Vec::new();
387
388        for i in 0..5 {
389            let data = format!("block {}", i);
390            let block = Block::new(Bytes::from(data)).unwrap();
391            cids.push(*block.cid());
392            store.put(&block).await.unwrap();
393        }
394
395        let config = BatchConfig::default();
396        let result = batch_has(store.clone(), cids, config).await;
397
398        assert!(result.is_success());
399        assert_eq!(result.success_count(), 5);
400
401        // All blocks should exist
402        for (_, exists) in result.successful {
403            assert!(exists);
404        }
405    }
406
407    #[tokio::test]
408    async fn test_batch_delete() {
409        let store = Arc::new(MemoryBlockStore::new());
410        let mut cids = Vec::new();
411
412        for i in 0..5 {
413            let data = format!("block {}", i);
414            let block = Block::new(Bytes::from(data)).unwrap();
415            cids.push(*block.cid());
416            store.put(&block).await.unwrap();
417        }
418
419        let config = BatchConfig::default();
420        let result = batch_delete(store.clone(), cids.clone(), config).await;
421
422        assert!(result.is_success());
423        assert_eq!(result.success_count(), 5);
424
425        // Verify blocks are deleted
426        for cid in cids {
427            assert!(!store.has(&cid).await.unwrap());
428        }
429    }
430
431    #[test]
432    fn test_batch_config_presets() {
433        let high_throughput = BatchConfig::high_throughput();
434        assert_eq!(high_throughput.max_concurrency, 50);
435        assert_eq!(high_throughput.batch_size, 500);
436
437        let low_latency = BatchConfig::low_latency();
438        assert_eq!(low_latency.max_concurrency, 20);
439        assert_eq!(low_latency.batch_size, 50);
440
441        let conservative = BatchConfig::conservative();
442        assert_eq!(conservative.max_concurrency, 5);
443        assert_eq!(conservative.batch_size, 20);
444    }
445
446    #[test]
447    fn test_batch_result() {
448        let mut result = BatchResult::<i32>::new();
449        result.total = 10;
450        result.successful = vec![1, 2, 3, 4, 5];
451        result.failed = vec![(6, "error".to_string())];
452
453        assert!(!result.is_success());
454        assert_eq!(result.success_count(), 5);
455        assert_eq!(result.failure_count(), 1);
456        assert_eq!(result.success_rate(), 0.5);
457    }
458}