fmp_rs/
bulk_processing.rs

1//! Memory-optimized bulk data processing for large datasets.
2
3use crate::error::{Error, Result};
4use futures::Stream;
5// Serde imports removed - not needed in this module
6use std::collections::VecDeque;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tokio::sync::mpsc;
10use tracing::{debug, info, warn};
11
12/// Configuration for bulk data processing
13#[derive(Debug, Clone)]
14pub struct BulkProcessingConfig {
15    /// Chunk size for processing large datasets
16    pub chunk_size: usize,
17    /// Maximum memory usage in bytes
18    pub max_memory_usage: usize,
19    /// Number of concurrent processing tasks
20    pub concurrency: usize,
21    /// Enable compression for temporary storage
22    pub enable_compression: bool,
23    /// Progress reporting interval
24    pub progress_interval: usize,
25}
26
27impl Default for BulkProcessingConfig {
28    fn default() -> Self {
29        Self {
30            chunk_size: 1000,
31            max_memory_usage: 100 * 1024 * 1024, // 100MB
32            concurrency: 4,
33            enable_compression: true,
34            progress_interval: 10000, // Report progress every 10k items
35        }
36    }
37}
38
39/// Progress information for bulk operations
40#[derive(Debug, Clone)]
41pub struct BulkProgress {
42    pub processed_items: usize,
43    pub total_items: Option<usize>,
44    pub processing_rate: f64, // items per second
45    pub estimated_completion: Option<std::time::Duration>,
46    pub memory_usage: usize,
47}
48
49/// Memory-aware chunk processor
50pub struct ChunkProcessor<T> {
51    config: BulkProcessingConfig,
52    buffer: VecDeque<T>,
53    processed_count: usize,
54    start_time: std::time::Instant,
55    last_progress_report: usize,
56}
57
58impl<T> ChunkProcessor<T>
59where
60    T: Clone + Send + Sync,
61{
62    pub fn new(config: BulkProcessingConfig) -> Self {
63        Self {
64            config,
65            buffer: VecDeque::new(),
66            processed_count: 0,
67            start_time: std::time::Instant::now(),
68            last_progress_report: 0,
69        }
70    }
71
72    /// Add items to the processing buffer
73    pub fn add_items(&mut self, items: Vec<T>) -> Result<()> {
74        // Check memory usage
75        let estimated_memory = self.estimate_memory_usage(&items);
76        if estimated_memory > self.config.max_memory_usage {
77            return Err(Error::Custom(
78                "Memory limit exceeded. Consider reducing chunk size.".to_string(),
79            ));
80        }
81
82        self.buffer.extend(items);
83        Ok(())
84    }
85
86    /// Process items in chunks with a callback function
87    pub async fn process_chunks<F, Fut, R>(&mut self, mut processor: F) -> Result<Vec<R>>
88    where
89        F: FnMut(Vec<T>) -> Fut,
90        Fut: std::future::Future<Output = Result<R>>,
91        R: Send,
92    {
93        let mut results = Vec::new();
94
95        while !self.buffer.is_empty() {
96            // Take a chunk from the buffer
97            let chunk_size = std::cmp::min(self.config.chunk_size, self.buffer.len());
98            let chunk: Vec<T> = self.buffer.drain(..chunk_size).collect();
99
100            // Process the chunk
101            match processor(chunk).await {
102                Ok(result) => {
103                    results.push(result);
104                    self.processed_count += chunk_size;
105
106                    // Report progress if needed
107                    if self.processed_count - self.last_progress_report
108                        >= self.config.progress_interval
109                    {
110                        self.report_progress();
111                        self.last_progress_report = self.processed_count;
112                    }
113                }
114                Err(e) => {
115                    warn!("Chunk processing failed: {}", e);
116                    return Err(e);
117                }
118            }
119        }
120
121        info!("Completed processing {} items", self.processed_count);
122        Ok(results)
123    }
124
125    /// Estimate memory usage of items
126    fn estimate_memory_usage(&self, items: &[T]) -> usize {
127        // Simple estimation: assume each item takes roughly 1KB
128        // In production, this could be more sophisticated
129        std::mem::size_of::<T>() * (self.buffer.len() + items.len())
130            + 1024 * (self.buffer.len() + items.len())
131    }
132
133    /// Report processing progress
134    fn report_progress(&self) {
135        let elapsed = self.start_time.elapsed();
136        let rate = self.processed_count as f64 / elapsed.as_secs_f64();
137
138        debug!(
139            "Processed {} items, rate: {:.2} items/sec, elapsed: {:?}",
140            self.processed_count, rate, elapsed
141        );
142    }
143
144    /// Get current progress
145    pub fn get_progress(&self) -> BulkProgress {
146        let elapsed = self.start_time.elapsed();
147        let rate = if elapsed.as_secs_f64() > 0.0 {
148            self.processed_count as f64 / elapsed.as_secs_f64()
149        } else {
150            0.0
151        };
152
153        BulkProgress {
154            processed_items: self.processed_count,
155            total_items: None, // Would need to be set externally
156            processing_rate: rate,
157            estimated_completion: None, // Would need total_items to calculate
158            memory_usage: self.estimate_memory_usage(&[]),
159        }
160    }
161}
162
163/// Streaming data processor for very large datasets
164pub struct StreamingProcessor<T> {
165    sender: mpsc::UnboundedSender<T>,
166    receiver: mpsc::UnboundedReceiver<T>,
167    config: BulkProcessingConfig,
168}
169
170impl<T> StreamingProcessor<T>
171where
172    T: Send + 'static,
173{
174    pub fn new(config: BulkProcessingConfig) -> Self {
175        let (sender, receiver) = mpsc::unbounded_channel();
176
177        Self {
178            sender,
179            receiver,
180            config,
181        }
182    }
183
184    /// Get a sender for streaming data
185    pub fn get_sender(&self) -> mpsc::UnboundedSender<T> {
186        self.sender.clone()
187    }
188
189    /// Process streaming data with backpressure
190    pub async fn process_stream<F, Fut, R>(&mut self, mut processor: F) -> Result<Vec<R>>
191    where
192        F: FnMut(Vec<T>) -> Fut,
193        Fut: std::future::Future<Output = Result<R>>,
194        R: Send,
195    {
196        let mut results = Vec::new();
197        let mut buffer = Vec::with_capacity(self.config.chunk_size);
198
199        while let Some(item) = self.receiver.recv().await {
200            buffer.push(item);
201
202            // Process when chunk is full
203            if buffer.len() >= self.config.chunk_size {
204                let chunk = std::mem::take(&mut buffer);
205                match processor(chunk).await {
206                    Ok(result) => results.push(result),
207                    Err(e) => return Err(e),
208                }
209            }
210        }
211
212        // Process remaining items
213        if !buffer.is_empty() {
214            match processor(buffer).await {
215                Ok(result) => results.push(result),
216                Err(e) => return Err(e),
217            }
218        }
219
220        Ok(results)
221    }
222}
223
224/// Async stream adapter for bulk data
225pub struct BulkDataStream<T> {
226    data: VecDeque<T>,
227    chunk_size: usize,
228}
229
230impl<T> BulkDataStream<T> {
231    pub fn new(data: Vec<T>, chunk_size: usize) -> Self {
232        Self {
233            data: VecDeque::from(data),
234            chunk_size,
235        }
236    }
237}
238
239impl<T> Stream for BulkDataStream<T>
240where
241    T: Clone + Unpin,
242{
243    type Item = Vec<T>;
244
245    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
246        if self.data.is_empty() {
247            return Poll::Ready(None);
248        }
249
250        let chunk_size = std::cmp::min(self.chunk_size, self.data.len());
251        let chunk: Vec<T> = self.data.drain(..chunk_size).collect();
252
253        Poll::Ready(Some(chunk))
254    }
255}
256
257/// Memory-efficient data aggregator
258pub struct DataAggregator<T> {
259    config: BulkProcessingConfig,
260    temp_storage: Vec<Vec<T>>,
261    total_items: usize,
262}
263
264impl<T> DataAggregator<T>
265where
266    T: Clone + Send + Sync,
267{
268    pub fn new(config: BulkProcessingConfig) -> Self {
269        Self {
270            config,
271            temp_storage: Vec::new(),
272            total_items: 0,
273        }
274    }
275
276    /// Add a batch of items
277    pub fn add_batch(&mut self, batch: Vec<T>) -> Result<()> {
278        let batch_size = batch.len();
279
280        // Check if adding this batch would exceed memory limits
281        let estimated_new_memory =
282            self.estimate_total_memory() + self.estimate_batch_memory(&batch);
283
284        if estimated_new_memory > self.config.max_memory_usage {
285            // Flush to disk or compress if enabled
286            if self.config.enable_compression {
287                self.compress_oldest_batch()?;
288            } else {
289                return Err(Error::Custom(
290                    "Memory limit exceeded and compression disabled".to_string(),
291                ));
292            }
293        }
294
295        self.temp_storage.push(batch);
296        self.total_items += batch_size;
297        Ok(())
298    }
299
300    /// Get all aggregated data
301    pub fn get_all_data(&mut self) -> Vec<T> {
302        let mut all_data = Vec::with_capacity(self.total_items);
303
304        for batch in self.temp_storage.drain(..) {
305            all_data.extend(batch);
306        }
307
308        self.total_items = 0;
309        all_data
310    }
311
312    /// Get data in chunks for memory-efficient processing
313    pub fn drain_chunks(&mut self, chunk_size: usize) -> Vec<Vec<T>> {
314        let mut chunks = Vec::new();
315        let mut current_chunk = Vec::with_capacity(chunk_size);
316
317        for batch in self.temp_storage.drain(..) {
318            for item in batch {
319                current_chunk.push(item);
320
321                if current_chunk.len() >= chunk_size {
322                    chunks.push(std::mem::take(&mut current_chunk));
323                    current_chunk = Vec::with_capacity(chunk_size);
324                }
325            }
326        }
327
328        if !current_chunk.is_empty() {
329            chunks.push(current_chunk);
330        }
331
332        self.total_items = 0;
333        chunks
334    }
335
336    /// Estimate memory usage of a batch
337    fn estimate_batch_memory(&self, batch: &[T]) -> usize {
338        std::mem::size_of::<T>() * batch.len() + 1024 * batch.len() // Rough overhead estimation
339    }
340
341    /// Estimate total memory usage
342    fn estimate_total_memory(&self) -> usize {
343        self.temp_storage
344            .iter()
345            .map(|batch| self.estimate_batch_memory(batch))
346            .sum()
347    }
348
349    /// Compress oldest batch to save memory
350    fn compress_oldest_batch(&mut self) -> Result<()> {
351        if self.temp_storage.is_empty() {
352            return Ok(());
353        }
354
355        // In a real implementation, this would compress the data
356        // For now, we'll just remove the oldest batch
357        warn!("Memory limit reached, removing oldest batch");
358        if !self.temp_storage.is_empty() {
359            let removed_batch = self.temp_storage.remove(0);
360            self.total_items -= removed_batch.len();
361        }
362
363        Ok(())
364    }
365
366    /// Get current statistics
367    pub fn get_stats(&self) -> (usize, usize, usize) {
368        (
369            self.total_items,
370            self.temp_storage.len(),
371            self.estimate_total_memory(),
372        )
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    #[tokio::test]
381    async fn test_chunk_processor() {
382        let config = BulkProcessingConfig {
383            chunk_size: 3,
384            ..Default::default()
385        };
386
387        let mut processor = ChunkProcessor::new(config);
388
389        // Add test data
390        let items = vec![1, 2, 3, 4, 5, 6, 7];
391        processor.add_items(items).unwrap();
392
393        // Process chunks
394        let results = processor
395            .process_chunks(|chunk| async move {
396                Ok(chunk.len()) // Just return chunk size
397            })
398            .await
399            .unwrap();
400
401        assert_eq!(results, vec![3, 3, 1]); // 3 chunks: [1,2,3], [4,5,6], [7]
402    }
403
404    #[test]
405    fn test_data_aggregator() {
406        let config = BulkProcessingConfig::default();
407        let mut aggregator = DataAggregator::new(config);
408
409        // Add batches
410        aggregator.add_batch(vec![1, 2, 3]).unwrap();
411        aggregator.add_batch(vec![4, 5]).unwrap();
412
413        // Get all data
414        let all_data = aggregator.get_all_data();
415        assert_eq!(all_data, vec![1, 2, 3, 4, 5]);
416    }
417
418    #[tokio::test]
419    async fn test_bulk_data_stream() {
420        use futures::StreamExt;
421
422        let data = vec![1, 2, 3, 4, 5, 6, 7];
423        let mut stream = BulkDataStream::new(data, 3);
424
425        let chunks: Vec<Vec<i32>> = stream.collect().await;
426        assert_eq!(chunks, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
427    }
428}