1use crate::error::{Error, Result};
4use futures::Stream;
5use std::collections::VecDeque;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tokio::sync::mpsc;
10use tracing::{debug, info, warn};
11
12#[derive(Debug, Clone)]
14pub struct BulkProcessingConfig {
15 pub chunk_size: usize,
17 pub max_memory_usage: usize,
19 pub concurrency: usize,
21 pub enable_compression: bool,
23 pub progress_interval: usize,
25}
26
27impl Default for BulkProcessingConfig {
28 fn default() -> Self {
29 Self {
30 chunk_size: 1000,
31 max_memory_usage: 100 * 1024 * 1024, concurrency: 4,
33 enable_compression: true,
34 progress_interval: 10000, }
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct BulkProgress {
42 pub processed_items: usize,
43 pub total_items: Option<usize>,
44 pub processing_rate: f64, pub estimated_completion: Option<std::time::Duration>,
46 pub memory_usage: usize,
47}
48
49pub struct ChunkProcessor<T> {
51 config: BulkProcessingConfig,
52 buffer: VecDeque<T>,
53 processed_count: usize,
54 start_time: std::time::Instant,
55 last_progress_report: usize,
56}
57
58impl<T> ChunkProcessor<T>
59where
60 T: Clone + Send + Sync,
61{
62 pub fn new(config: BulkProcessingConfig) -> Self {
63 Self {
64 config,
65 buffer: VecDeque::new(),
66 processed_count: 0,
67 start_time: std::time::Instant::now(),
68 last_progress_report: 0,
69 }
70 }
71
72 pub fn add_items(&mut self, items: Vec<T>) -> Result<()> {
74 let estimated_memory = self.estimate_memory_usage(&items);
76 if estimated_memory > self.config.max_memory_usage {
77 return Err(Error::Custom(
78 "Memory limit exceeded. Consider reducing chunk size.".to_string(),
79 ));
80 }
81
82 self.buffer.extend(items);
83 Ok(())
84 }
85
86 pub async fn process_chunks<F, Fut, R>(&mut self, mut processor: F) -> Result<Vec<R>>
88 where
89 F: FnMut(Vec<T>) -> Fut,
90 Fut: std::future::Future<Output = Result<R>>,
91 R: Send,
92 {
93 let mut results = Vec::new();
94
95 while !self.buffer.is_empty() {
96 let chunk_size = std::cmp::min(self.config.chunk_size, self.buffer.len());
98 let chunk: Vec<T> = self.buffer.drain(..chunk_size).collect();
99
100 match processor(chunk).await {
102 Ok(result) => {
103 results.push(result);
104 self.processed_count += chunk_size;
105
106 if self.processed_count - self.last_progress_report
108 >= self.config.progress_interval
109 {
110 self.report_progress();
111 self.last_progress_report = self.processed_count;
112 }
113 }
114 Err(e) => {
115 warn!("Chunk processing failed: {}", e);
116 return Err(e);
117 }
118 }
119 }
120
121 info!("Completed processing {} items", self.processed_count);
122 Ok(results)
123 }
124
125 fn estimate_memory_usage(&self, items: &[T]) -> usize {
127 std::mem::size_of::<T>() * (self.buffer.len() + items.len())
130 + 1024 * (self.buffer.len() + items.len())
131 }
132
133 fn report_progress(&self) {
135 let elapsed = self.start_time.elapsed();
136 let rate = self.processed_count as f64 / elapsed.as_secs_f64();
137
138 debug!(
139 "Processed {} items, rate: {:.2} items/sec, elapsed: {:?}",
140 self.processed_count, rate, elapsed
141 );
142 }
143
144 pub fn get_progress(&self) -> BulkProgress {
146 let elapsed = self.start_time.elapsed();
147 let rate = if elapsed.as_secs_f64() > 0.0 {
148 self.processed_count as f64 / elapsed.as_secs_f64()
149 } else {
150 0.0
151 };
152
153 BulkProgress {
154 processed_items: self.processed_count,
155 total_items: None, processing_rate: rate,
157 estimated_completion: None, memory_usage: self.estimate_memory_usage(&[]),
159 }
160 }
161}
162
163pub struct StreamingProcessor<T> {
165 sender: mpsc::UnboundedSender<T>,
166 receiver: mpsc::UnboundedReceiver<T>,
167 config: BulkProcessingConfig,
168}
169
170impl<T> StreamingProcessor<T>
171where
172 T: Send + 'static,
173{
174 pub fn new(config: BulkProcessingConfig) -> Self {
175 let (sender, receiver) = mpsc::unbounded_channel();
176
177 Self {
178 sender,
179 receiver,
180 config,
181 }
182 }
183
184 pub fn get_sender(&self) -> mpsc::UnboundedSender<T> {
186 self.sender.clone()
187 }
188
189 pub async fn process_stream<F, Fut, R>(&mut self, mut processor: F) -> Result<Vec<R>>
191 where
192 F: FnMut(Vec<T>) -> Fut,
193 Fut: std::future::Future<Output = Result<R>>,
194 R: Send,
195 {
196 let mut results = Vec::new();
197 let mut buffer = Vec::with_capacity(self.config.chunk_size);
198
199 while let Some(item) = self.receiver.recv().await {
200 buffer.push(item);
201
202 if buffer.len() >= self.config.chunk_size {
204 let chunk = std::mem::take(&mut buffer);
205 match processor(chunk).await {
206 Ok(result) => results.push(result),
207 Err(e) => return Err(e),
208 }
209 }
210 }
211
212 if !buffer.is_empty() {
214 match processor(buffer).await {
215 Ok(result) => results.push(result),
216 Err(e) => return Err(e),
217 }
218 }
219
220 Ok(results)
221 }
222}
223
224pub struct BulkDataStream<T> {
226 data: VecDeque<T>,
227 chunk_size: usize,
228}
229
230impl<T> BulkDataStream<T> {
231 pub fn new(data: Vec<T>, chunk_size: usize) -> Self {
232 Self {
233 data: VecDeque::from(data),
234 chunk_size,
235 }
236 }
237}
238
239impl<T> Stream for BulkDataStream<T>
240where
241 T: Clone + Unpin,
242{
243 type Item = Vec<T>;
244
245 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
246 if self.data.is_empty() {
247 return Poll::Ready(None);
248 }
249
250 let chunk_size = std::cmp::min(self.chunk_size, self.data.len());
251 let chunk: Vec<T> = self.data.drain(..chunk_size).collect();
252
253 Poll::Ready(Some(chunk))
254 }
255}
256
257pub struct DataAggregator<T> {
259 config: BulkProcessingConfig,
260 temp_storage: Vec<Vec<T>>,
261 total_items: usize,
262}
263
264impl<T> DataAggregator<T>
265where
266 T: Clone + Send + Sync,
267{
268 pub fn new(config: BulkProcessingConfig) -> Self {
269 Self {
270 config,
271 temp_storage: Vec::new(),
272 total_items: 0,
273 }
274 }
275
276 pub fn add_batch(&mut self, batch: Vec<T>) -> Result<()> {
278 let batch_size = batch.len();
279
280 let estimated_new_memory =
282 self.estimate_total_memory() + self.estimate_batch_memory(&batch);
283
284 if estimated_new_memory > self.config.max_memory_usage {
285 if self.config.enable_compression {
287 self.compress_oldest_batch()?;
288 } else {
289 return Err(Error::Custom(
290 "Memory limit exceeded and compression disabled".to_string(),
291 ));
292 }
293 }
294
295 self.temp_storage.push(batch);
296 self.total_items += batch_size;
297 Ok(())
298 }
299
300 pub fn get_all_data(&mut self) -> Vec<T> {
302 let mut all_data = Vec::with_capacity(self.total_items);
303
304 for batch in self.temp_storage.drain(..) {
305 all_data.extend(batch);
306 }
307
308 self.total_items = 0;
309 all_data
310 }
311
312 pub fn drain_chunks(&mut self, chunk_size: usize) -> Vec<Vec<T>> {
314 let mut chunks = Vec::new();
315 let mut current_chunk = Vec::with_capacity(chunk_size);
316
317 for batch in self.temp_storage.drain(..) {
318 for item in batch {
319 current_chunk.push(item);
320
321 if current_chunk.len() >= chunk_size {
322 chunks.push(std::mem::take(&mut current_chunk));
323 current_chunk = Vec::with_capacity(chunk_size);
324 }
325 }
326 }
327
328 if !current_chunk.is_empty() {
329 chunks.push(current_chunk);
330 }
331
332 self.total_items = 0;
333 chunks
334 }
335
336 fn estimate_batch_memory(&self, batch: &[T]) -> usize {
338 std::mem::size_of::<T>() * batch.len() + 1024 * batch.len() }
340
341 fn estimate_total_memory(&self) -> usize {
343 self.temp_storage
344 .iter()
345 .map(|batch| self.estimate_batch_memory(batch))
346 .sum()
347 }
348
349 fn compress_oldest_batch(&mut self) -> Result<()> {
351 if self.temp_storage.is_empty() {
352 return Ok(());
353 }
354
355 warn!("Memory limit reached, removing oldest batch");
358 if !self.temp_storage.is_empty() {
359 let removed_batch = self.temp_storage.remove(0);
360 self.total_items -= removed_batch.len();
361 }
362
363 Ok(())
364 }
365
366 pub fn get_stats(&self) -> (usize, usize, usize) {
368 (
369 self.total_items,
370 self.temp_storage.len(),
371 self.estimate_total_memory(),
372 )
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 #[tokio::test]
381 async fn test_chunk_processor() {
382 let config = BulkProcessingConfig {
383 chunk_size: 3,
384 ..Default::default()
385 };
386
387 let mut processor = ChunkProcessor::new(config);
388
389 let items = vec![1, 2, 3, 4, 5, 6, 7];
391 processor.add_items(items).unwrap();
392
393 let results = processor
395 .process_chunks(|chunk| async move {
396 Ok(chunk.len()) })
398 .await
399 .unwrap();
400
401 assert_eq!(results, vec![3, 3, 1]); }
403
404 #[test]
405 fn test_data_aggregator() {
406 let config = BulkProcessingConfig::default();
407 let mut aggregator = DataAggregator::new(config);
408
409 aggregator.add_batch(vec![1, 2, 3]).unwrap();
411 aggregator.add_batch(vec![4, 5]).unwrap();
412
413 let all_data = aggregator.get_all_data();
415 assert_eq!(all_data, vec![1, 2, 3, 4, 5]);
416 }
417
418 #[tokio::test]
419 async fn test_bulk_data_stream() {
420 use futures::StreamExt;
421
422 let data = vec![1, 2, 3, 4, 5, 6, 7];
423 let mut stream = BulkDataStream::new(data, 3);
424
425 let chunks: Vec<Vec<i32>> = stream.collect().await;
426 assert_eq!(chunks, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
427 }
428}