1use crate::traits::BlockStore;
7use ipfrs_core::{Block, Cid};
8use std::sync::Arc;
9use tokio::sync::Semaphore;
10
11#[derive(Debug, Clone)]
13pub struct BatchConfig {
14 pub max_concurrency: usize,
16 pub batch_size: usize,
18 pub fail_fast: bool,
20}
21
22impl Default for BatchConfig {
23 fn default() -> Self {
24 Self {
25 max_concurrency: 10,
26 batch_size: 100,
27 fail_fast: false,
28 }
29 }
30}
31
32impl BatchConfig {
33 pub fn new(max_concurrency: usize, batch_size: usize) -> Self {
35 Self {
36 max_concurrency,
37 batch_size,
38 fail_fast: false,
39 }
40 }
41
42 pub fn with_fail_fast(mut self, fail_fast: bool) -> Self {
44 self.fail_fast = fail_fast;
45 self
46 }
47
48 pub fn high_throughput() -> Self {
50 Self {
51 max_concurrency: 50,
52 batch_size: 500,
53 fail_fast: false,
54 }
55 }
56
57 pub fn low_latency() -> Self {
59 Self {
60 max_concurrency: 20,
61 batch_size: 50,
62 fail_fast: false,
63 }
64 }
65
66 pub fn conservative() -> Self {
68 Self {
69 max_concurrency: 5,
70 batch_size: 20,
71 fail_fast: false,
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct BatchResult<T> {
79 pub successful: Vec<T>,
81 pub failed: Vec<(T, String)>,
83 pub total: usize,
85}
86
87impl<T> BatchResult<T> {
88 pub fn new() -> Self {
90 Self {
91 successful: Vec::new(),
92 failed: Vec::new(),
93 total: 0,
94 }
95 }
96
97 pub fn is_success(&self) -> bool {
99 self.failed.is_empty()
100 }
101
102 pub fn success_rate(&self) -> f64 {
104 if self.total == 0 {
105 1.0
106 } else {
107 self.successful.len() as f64 / self.total as f64
108 }
109 }
110
111 pub fn success_count(&self) -> usize {
113 self.successful.len()
114 }
115
116 pub fn failure_count(&self) -> usize {
118 self.failed.len()
119 }
120}
121
122impl<T> Default for BatchResult<T> {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128pub async fn batch_put<S: BlockStore + Send + Sync + 'static>(
133 store: Arc<S>,
134 blocks: Vec<Block>,
135 config: BatchConfig,
136) -> BatchResult<Cid> {
137 let mut result = BatchResult::new();
138 result.total = blocks.len();
139
140 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
141 let mut handles = Vec::new();
142
143 for chunk in blocks.chunks(config.batch_size) {
144 for block in chunk {
145 let permit = semaphore.clone().acquire_owned().await.unwrap();
146 let block = block.clone();
147 let cid = *block.cid();
148 let store = store.clone();
149
150 let handle = tokio::spawn(async move {
151 let _permit = permit; (cid, store.put(&block).await)
153 });
154
155 handles.push(handle);
156 }
157
158 for handle in handles.drain(..) {
160 match handle.await {
161 Ok((cid, Ok(_))) => result.successful.push(cid),
162 Ok((cid, Err(e))) => {
163 result.failed.push((cid, e.to_string()));
164 if config.fail_fast {
165 return result;
166 }
167 }
168 Err(e) => {
169 result
171 .failed
172 .push((Cid::default(), format!("Task error: {e}")));
173 }
174 }
175 }
176 }
177
178 result
179}
180
181pub async fn batch_get<S: BlockStore + Send + Sync + 'static>(
185 store: Arc<S>,
186 cids: Vec<Cid>,
187 config: BatchConfig,
188) -> BatchResult<Block> {
189 let mut result = BatchResult::new();
190 result.total = cids.len();
191
192 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
193 let mut handles = Vec::new();
194
195 for chunk in cids.chunks(config.batch_size) {
196 for cid in chunk {
197 let permit = semaphore.clone().acquire_owned().await.unwrap();
198 let cid = *cid;
199 let store = store.clone();
200
201 let handle = tokio::spawn(async move {
202 let _permit = permit;
203 (cid, store.get(&cid).await)
204 });
205
206 handles.push(handle);
207 }
208
209 for handle in handles.drain(..) {
211 match handle.await {
212 Ok((_cid, Ok(Some(block)))) => result.successful.push(block),
213 Ok((cid, Ok(None))) => {
214 result.failed.push((
215 Block::from_parts(cid, bytes::Bytes::new()),
216 "Block not found".to_string(),
217 ));
218 }
219 Ok((cid, Err(e))) => {
220 result
221 .failed
222 .push((Block::from_parts(cid, bytes::Bytes::new()), e.to_string()));
223 if config.fail_fast {
224 return result;
225 }
226 }
227 Err(e) => {
228 result.failed.push((
229 Block::from_parts(Cid::default(), bytes::Bytes::new()),
230 format!("Task error: {e}"),
231 ));
232 }
233 }
234 }
235 }
236
237 result
238}
239
240pub async fn batch_delete<S: BlockStore + Send + Sync + 'static>(
242 store: Arc<S>,
243 cids: Vec<Cid>,
244 config: BatchConfig,
245) -> BatchResult<Cid> {
246 let mut result = BatchResult::new();
247 result.total = cids.len();
248
249 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
250 let mut handles = Vec::new();
251
252 for chunk in cids.chunks(config.batch_size) {
253 for cid in chunk {
254 let permit = semaphore.clone().acquire_owned().await.unwrap();
255 let cid = *cid;
256 let store = store.clone();
257
258 let handle = tokio::spawn(async move {
259 let _permit = permit;
260 (cid, store.delete(&cid).await)
261 });
262
263 handles.push(handle);
264 }
265
266 for handle in handles.drain(..) {
268 match handle.await {
269 Ok((cid, Ok(_))) => result.successful.push(cid),
270 Ok((cid, Err(e))) => {
271 result.failed.push((cid, e.to_string()));
272 if config.fail_fast {
273 return result;
274 }
275 }
276 Err(e) => {
277 result
278 .failed
279 .push((Cid::default(), format!("Task error: {e}")));
280 }
281 }
282 }
283 }
284
285 result
286}
287
288pub async fn batch_has<S: BlockStore + Send + Sync + 'static>(
290 store: Arc<S>,
291 cids: Vec<Cid>,
292 config: BatchConfig,
293) -> BatchResult<(Cid, bool)> {
294 let mut result = BatchResult::new();
295 result.total = cids.len();
296
297 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
298 let mut handles = Vec::new();
299
300 for chunk in cids.chunks(config.batch_size) {
301 for cid in chunk {
302 let permit = semaphore.clone().acquire_owned().await.unwrap();
303 let cid = *cid;
304 let store = store.clone();
305
306 let handle = tokio::spawn(async move {
307 let _permit = permit;
308 (cid, store.has(&cid).await)
309 });
310
311 handles.push(handle);
312 }
313
314 for handle in handles.drain(..) {
316 match handle.await {
317 Ok((cid, Ok(exists))) => result.successful.push((cid, exists)),
318 Ok((cid, Err(e))) => {
319 result.failed.push(((cid, false), e.to_string()));
320 if config.fail_fast {
321 return result;
322 }
323 }
324 Err(e) => {
325 result
326 .failed
327 .push(((Cid::default(), false), format!("Task error: {e}")));
328 }
329 }
330 }
331 }
332
333 result
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use crate::MemoryBlockStore;
340 use bytes::Bytes;
341
342 #[tokio::test]
343 async fn test_batch_put() {
344 let store = Arc::new(MemoryBlockStore::new());
345 let mut blocks = Vec::new();
346
347 for i in 0..10 {
348 let data = format!("block {}", i);
349 let block = Block::new(Bytes::from(data)).unwrap();
350 blocks.push(block);
351 }
352
353 let config = BatchConfig::default();
354 let result = batch_put(store.clone(), blocks.clone(), config).await;
355
356 assert!(result.is_success());
357 assert_eq!(result.success_count(), 10);
358 assert_eq!(result.failure_count(), 0);
359 assert_eq!(result.success_rate(), 1.0);
360 }
361
362 #[tokio::test]
363 async fn test_batch_get() {
364 let store = Arc::new(MemoryBlockStore::new());
365 let mut blocks = Vec::new();
366 let mut cids = Vec::new();
367
368 for i in 0..5 {
369 let data = format!("block {}", i);
370 let block = Block::new(Bytes::from(data)).unwrap();
371 cids.push(*block.cid());
372 store.put(&block).await.unwrap();
373 blocks.push(block);
374 }
375
376 let config = BatchConfig::default();
377 let result = batch_get(store.clone(), cids, config).await;
378
379 assert!(result.is_success());
380 assert_eq!(result.success_count(), 5);
381 }
382
383 #[tokio::test]
384 async fn test_batch_has() {
385 let store = Arc::new(MemoryBlockStore::new());
386 let mut cids = Vec::new();
387
388 for i in 0..5 {
389 let data = format!("block {}", i);
390 let block = Block::new(Bytes::from(data)).unwrap();
391 cids.push(*block.cid());
392 store.put(&block).await.unwrap();
393 }
394
395 let config = BatchConfig::default();
396 let result = batch_has(store.clone(), cids, config).await;
397
398 assert!(result.is_success());
399 assert_eq!(result.success_count(), 5);
400
401 for (_, exists) in result.successful {
403 assert!(exists);
404 }
405 }
406
407 #[tokio::test]
408 async fn test_batch_delete() {
409 let store = Arc::new(MemoryBlockStore::new());
410 let mut cids = Vec::new();
411
412 for i in 0..5 {
413 let data = format!("block {}", i);
414 let block = Block::new(Bytes::from(data)).unwrap();
415 cids.push(*block.cid());
416 store.put(&block).await.unwrap();
417 }
418
419 let config = BatchConfig::default();
420 let result = batch_delete(store.clone(), cids.clone(), config).await;
421
422 assert!(result.is_success());
423 assert_eq!(result.success_count(), 5);
424
425 for cid in cids {
427 assert!(!store.has(&cid).await.unwrap());
428 }
429 }
430
431 #[test]
432 fn test_batch_config_presets() {
433 let high_throughput = BatchConfig::high_throughput();
434 assert_eq!(high_throughput.max_concurrency, 50);
435 assert_eq!(high_throughput.batch_size, 500);
436
437 let low_latency = BatchConfig::low_latency();
438 assert_eq!(low_latency.max_concurrency, 20);
439 assert_eq!(low_latency.batch_size, 50);
440
441 let conservative = BatchConfig::conservative();
442 assert_eq!(conservative.max_concurrency, 5);
443 assert_eq!(conservative.batch_size, 20);
444 }
445
446 #[test]
447 fn test_batch_result() {
448 let mut result = BatchResult::<i32>::new();
449 result.total = 10;
450 result.successful = vec![1, 2, 3, 4, 5];
451 result.failed = vec![(6, "error".to_string())];
452
453 assert!(!result.is_success());
454 assert_eq!(result.success_count(), 5);
455 assert_eq!(result.failure_count(), 1);
456 assert_eq!(result.success_rate(), 0.5);
457 }
458}