1use crate::error::Result;
2use std::collections::VecDeque;
3
4#[derive(Debug, Clone)]
6pub struct MemoryConfig {
7 pub chunk_size: usize,
8 pub max_memory_mb: usize,
9 pub enable_streaming: bool,
10 pub enable_compression: bool,
11}
12
13impl Default for MemoryConfig {
14 fn default() -> Self {
15 Self {
16 chunk_size: 10000,
17 max_memory_mb: 512,
18 enable_streaming: true,
19 enable_compression: false,
20 }
21 }
22}
23
24impl MemoryConfig {
25 pub fn adaptive_chunk_size(file_size: u64) -> usize {
27 match file_size {
28 0..=1_000_000 => 1000, 1_000_001..=10_000_000 => 5000, _ => 10000, }
32 }
33
34 pub fn memory_efficiency_target(file_size: u64) -> usize {
36 let mb_size = (file_size / 1024 / 1024) as usize;
37 if mb_size > 100 {
38 mb_size / 2 } else {
40 mb_size * 2 }
42 }
43}
44
45pub struct StreamingProcessor<T> {
47 buffer: VecDeque<T>,
48 chunk_size: usize,
49 total_processed: usize,
50}
51
52impl<T> StreamingProcessor<T> {
53 pub fn new(config: &MemoryConfig) -> Self {
54 Self {
55 buffer: VecDeque::new(),
56 chunk_size: config.chunk_size,
57 total_processed: 0,
58 }
59 }
60
61 pub fn push(&mut self, item: T) -> Option<Vec<T>> {
63 self.buffer.push_back(item);
64
65 if self.buffer.len() >= self.chunk_size {
66 self.flush_chunk()
67 } else {
68 None
69 }
70 }
71
72 fn flush_chunk(&mut self) -> Option<Vec<T>> {
74 if self.buffer.is_empty() {
75 return None;
76 }
77
78 let chunk_size = self.chunk_size.min(self.buffer.len());
79 let chunk: Vec<T> = self.buffer.drain(0..chunk_size).collect();
80 self.total_processed += chunk.len();
81
82 Some(chunk)
83 }
84
85 pub fn finish(mut self) -> Option<Vec<T>> {
87 if self.buffer.is_empty() {
88 None
89 } else {
90 let remaining: Vec<T> = self.buffer.into_iter().collect();
91 self.total_processed += remaining.len();
92 Some(remaining)
93 }
94 }
95
96 pub fn processed_count(&self) -> usize {
98 self.total_processed
99 }
100
101 pub fn buffer_size(&self) -> usize {
103 self.buffer.len()
104 }
105}
106
107pub struct ChunkIterator<T> {
109 data: Vec<T>,
110 chunk_size: usize,
111 current_index: usize,
112}
113
114impl<T> ChunkIterator<T> {
115 pub fn new(data: Vec<T>, chunk_size: usize) -> Self {
116 Self {
117 data,
118 chunk_size,
119 current_index: 0,
120 }
121 }
122}
123
124impl<T: Clone> Iterator for ChunkIterator<T> {
125 type Item = Vec<T>;
126
127 fn next(&mut self) -> Option<Self::Item> {
128 if self.current_index >= self.data.len() {
129 return None;
130 }
131
132 let end_index = (self.current_index + self.chunk_size).min(self.data.len());
133 let chunk = self.data[self.current_index..end_index].to_vec();
134 self.current_index = end_index;
135
136 Some(chunk)
137 }
138}
139
140#[derive(Debug, Clone)]
142pub struct IncrementalStatistics {
143 count: usize,
144 sum: f64,
145 sum_squares: f64,
146 min: f64,
147 max: f64,
148 m2: f64, mean: f64,
150}
151
152impl Default for IncrementalStatistics {
153 fn default() -> Self {
154 Self {
155 count: 0,
156 sum: 0.0,
157 sum_squares: 0.0,
158 min: f64::INFINITY,
159 max: f64::NEG_INFINITY,
160 m2: 0.0,
161 mean: 0.0,
162 }
163 }
164}
165
166impl IncrementalStatistics {
167 pub fn new() -> Self {
168 Self::default()
169 }
170
171 pub fn add(&mut self, value: f64) {
173 self.count += 1;
174 self.sum += value;
175 self.sum_squares += value * value;
176 self.min = self.min.min(value);
177 self.max = self.max.max(value);
178
179 let delta = value - self.mean;
181 self.mean += delta / self.count as f64;
182 let delta2 = value - self.mean;
183 self.m2 += delta * delta2;
184 }
185
186 pub fn add_batch(&mut self, values: &[f64]) {
188 for &value in values {
189 self.add(value);
190 }
191 }
192
193 pub fn merge(&mut self, other: &IncrementalStatistics) {
195 if other.count == 0 {
196 return;
197 }
198
199 if self.count == 0 {
200 *self = other.clone();
201 return;
202 }
203
204 let combined_count = self.count + other.count;
205 let delta = other.mean - self.mean;
206 let combined_mean = (self.count as f64 * self.mean + other.count as f64 * other.mean)
207 / combined_count as f64;
208
209 let combined_m2 = self.m2
211 + other.m2
212 + delta * delta * (self.count as f64 * other.count as f64) / combined_count as f64;
213
214 self.count = combined_count;
215 self.sum += other.sum;
216 self.sum_squares += other.sum_squares;
217 self.min = self.min.min(other.min);
218 self.max = self.max.max(other.max);
219 self.mean = combined_mean;
220 self.m2 = combined_m2;
221 }
222
223 pub fn mean(&self) -> f64 {
225 self.mean
226 }
227
228 pub fn variance(&self) -> f64 {
230 if self.count < 2 {
231 0.0
232 } else {
233 self.m2 / (self.count - 1) as f64
234 }
235 }
236
237 pub fn std_dev(&self) -> f64 {
239 self.variance().sqrt()
240 }
241
242 pub fn count(&self) -> usize {
244 self.count
245 }
246
247 pub fn min(&self) -> f64 {
249 self.min
250 }
251
252 pub fn max(&self) -> f64 {
254 self.max
255 }
256}
257
258#[derive(Debug, Clone, Default)]
260pub struct IncrementalBenford {
261 first_digit_counts: [usize; 9],
262 total_count: usize,
263}
264
265impl IncrementalBenford {
266 pub fn new() -> Self {
267 Self::default()
268 }
269
270 pub fn add(&mut self, value: f64) {
272 let abs_value = value.abs();
273 if abs_value >= 1.0 {
274 let first_digit = get_first_digit(abs_value);
275 if (1..=9).contains(&first_digit) {
276 self.first_digit_counts[first_digit - 1] += 1;
277 self.total_count += 1;
278 }
279 }
280 }
281
282 pub fn add_batch(&mut self, values: &[f64]) {
284 for &value in values {
285 self.add(value);
286 }
287 }
288
289 pub fn merge(&mut self, other: &IncrementalBenford) {
291 for (i, &other_count) in other.first_digit_counts.iter().enumerate() {
292 self.first_digit_counts[i] += other_count;
293 }
294 self.total_count += other.total_count;
295 }
296
297 pub fn calculate_mad(&self) -> f64 {
299 if self.total_count == 0 {
300 return 0.0;
301 }
302
303 let expected_proportions = [
304 30.103, 17.609, 12.494, 9.691, 7.918, 6.695, 5.799, 5.115, 4.576,
305 ];
306
307 let mut mad = 0.0;
308 for (i, &expected) in expected_proportions.iter().enumerate() {
309 let observed_prop =
310 (self.first_digit_counts[i] as f64 / self.total_count as f64) * 100.0;
311 mad += (observed_prop - expected).abs();
312 }
313
314 mad / 9.0
315 }
316
317 pub fn get_distribution(&self) -> [f64; 9] {
319 let mut distribution = [0.0; 9];
320 if self.total_count > 0 {
321 for (i, item) in distribution.iter_mut().enumerate() {
322 *item = (self.first_digit_counts[i] as f64 / self.total_count as f64) * 100.0;
323 }
324 }
325 distribution
326 }
327
328 pub fn get_counts(&self) -> &[usize; 9] {
330 &self.first_digit_counts
331 }
332
333 pub fn total_count(&self) -> usize {
335 self.total_count
336 }
337}
338
339#[derive(Debug, Clone)]
341pub struct ChunkAnalysisResult<T> {
342 pub chunks_processed: usize,
343 pub total_items: usize,
344 pub memory_used_mb: f64,
345 pub processing_time_ms: u64,
346 pub result: T,
347}
348
349pub fn streaming_benford_analysis<I>(
351 data_iter: I,
352 config: &MemoryConfig,
353) -> Result<ChunkAnalysisResult<IncrementalBenford>>
354where
355 I: Iterator<Item = f64>,
356{
357 let start_time = std::time::Instant::now();
358 let mut processor = StreamingProcessor::new(config);
359 let mut benford = IncrementalBenford::new();
360 let mut chunks_processed = 0;
361
362 for value in data_iter {
363 if let Some(chunk) = processor.push(value) {
364 let mut chunk_benford = IncrementalBenford::new();
365 chunk_benford.add_batch(&chunk);
366 benford.merge(&chunk_benford);
367 chunks_processed += 1;
368 }
369 }
370
371 let mut total_processed = processor.processed_count();
373
374 if let Some(remaining) = processor.finish() {
376 total_processed += remaining.len(); let mut chunk_benford = IncrementalBenford::new();
378 chunk_benford.add_batch(&remaining);
379 benford.merge(&chunk_benford);
380 chunks_processed += 1;
381 }
382
383 let memory_used_mb = (total_processed * std::mem::size_of::<f64>()) as f64 / 1024.0 / 1024.0;
384
385 Ok(ChunkAnalysisResult {
386 chunks_processed,
387 total_items: total_processed,
388 memory_used_mb,
389 processing_time_ms: start_time.elapsed().as_millis() as u64,
390 result: benford,
391 })
392}
393
394pub fn streaming_statistics_analysis<I>(
396 data_iter: I,
397 config: &MemoryConfig,
398) -> Result<ChunkAnalysisResult<IncrementalStatistics>>
399where
400 I: Iterator<Item = f64>,
401{
402 let start_time = std::time::Instant::now();
403 let mut processor = StreamingProcessor::new(config);
404 let mut stats = IncrementalStatistics::new();
405 let mut chunks_processed = 0;
406
407 for value in data_iter {
408 if let Some(chunk) = processor.push(value) {
409 let mut chunk_stats = IncrementalStatistics::new();
410 chunk_stats.add_batch(&chunk);
411 stats.merge(&chunk_stats);
412 chunks_processed += 1;
413 }
414 }
415
416 let total_processed = processor.processed_count();
418
419 if let Some(remaining) = processor.finish() {
421 let mut chunk_stats = IncrementalStatistics::new();
422 chunk_stats.add_batch(&remaining);
423 stats.merge(&chunk_stats);
424 chunks_processed += 1;
425 }
426
427 let memory_used_mb = (total_processed * std::mem::size_of::<f64>()) as f64 / 1024.0 / 1024.0;
428
429 Ok(ChunkAnalysisResult {
430 chunks_processed,
431 total_items: total_processed,
432 memory_used_mb,
433 processing_time_ms: start_time.elapsed().as_millis() as u64,
434 result: stats,
435 })
436}
437
438#[derive(Debug, Clone)]
440pub struct IncrementalPareto {
441 values: Vec<f64>,
442 sorted_values: Vec<f64>,
443 needs_sorting: bool,
444 statistics: IncrementalStatistics,
445}
446
447impl Default for IncrementalPareto {
448 fn default() -> Self {
449 Self::new()
450 }
451}
452
453impl IncrementalPareto {
454 pub fn new() -> Self {
455 Self {
456 values: Vec::new(),
457 sorted_values: Vec::new(),
458 needs_sorting: true,
459 statistics: IncrementalStatistics::new(),
460 }
461 }
462
463 pub fn add(&mut self, value: f64) {
464 self.values.push(value);
465 self.statistics.add(value);
466 self.needs_sorting = true;
467 }
468
469 pub fn add_batch(&mut self, values: &[f64]) {
470 for &value in values {
471 self.add(value);
472 }
473 }
474
475 pub fn merge(&mut self, other: &IncrementalPareto) {
476 self.values.extend_from_slice(&other.values);
477 self.statistics.merge(&other.statistics);
478 self.needs_sorting = true;
479 }
480
481 pub fn get_sorted_values(&mut self) -> &[f64] {
482 if self.needs_sorting {
483 self.sorted_values = self.values.clone();
484 self.sorted_values.sort_by(|a, b| b.partial_cmp(a).unwrap());
485 self.needs_sorting = false;
486 }
487 &self.sorted_values
488 }
489
490 pub fn statistics(&self) -> &IncrementalStatistics {
491 &self.statistics
492 }
493
494 pub fn count(&self) -> usize {
495 self.values.len()
496 }
497}
498
499#[derive(Debug, Clone)]
501pub struct IncrementalZipf {
502 frequency_map: std::collections::HashMap<String, usize>,
503 total_count: usize,
504}
505
506impl Default for IncrementalZipf {
507 fn default() -> Self {
508 Self::new()
509 }
510}
511
512impl IncrementalZipf {
513 pub fn new() -> Self {
514 Self {
515 frequency_map: std::collections::HashMap::new(),
516 total_count: 0,
517 }
518 }
519
520 pub fn add_word(&mut self, word: String) {
521 *self.frequency_map.entry(word).or_insert(0) += 1;
522 self.total_count += 1;
523 }
524
525 pub fn add_words(&mut self, words: &[String]) {
526 for word in words {
527 self.add_word(word.clone());
528 }
529 }
530
531 pub fn merge(&mut self, other: &IncrementalZipf) {
532 for (word, count) in &other.frequency_map {
533 *self.frequency_map.entry(word.clone()).or_insert(0) += count;
534 }
535 self.total_count += other.total_count;
536 }
537
538 pub fn get_sorted_frequencies(&self) -> Vec<(String, usize)> {
539 let mut frequencies: Vec<_> = self
540 .frequency_map
541 .iter()
542 .map(|(word, &count)| (word.clone(), count))
543 .collect();
544 frequencies.sort_by(|a, b| b.1.cmp(&a.1));
545 frequencies
546 }
547
548 pub fn total_count(&self) -> usize {
549 self.total_count
550 }
551
552 pub fn unique_words(&self) -> usize {
553 self.frequency_map.len()
554 }
555}
556
557#[derive(Debug, Clone)]
559pub struct IncrementalNormal {
560 statistics: IncrementalStatistics,
561 values: Vec<f64>, }
563
564impl Default for IncrementalNormal {
565 fn default() -> Self {
566 Self::new()
567 }
568}
569
570impl IncrementalNormal {
571 pub fn new() -> Self {
572 Self {
573 statistics: IncrementalStatistics::new(),
574 values: Vec::new(),
575 }
576 }
577
578 pub fn add(&mut self, value: f64) {
579 self.statistics.add(value);
580 self.values.push(value);
581 }
582
583 pub fn add_batch(&mut self, values: &[f64]) {
584 for &value in values {
585 self.add(value);
586 }
587 }
588
589 pub fn merge(&mut self, other: &IncrementalNormal) {
590 self.statistics.merge(&other.statistics);
591 self.values.extend_from_slice(&other.values);
592 }
593
594 pub fn statistics(&self) -> &IncrementalStatistics {
595 &self.statistics
596 }
597
598 pub fn values(&self) -> &[f64] {
599 &self.values
600 }
601
602 pub fn count(&self) -> usize {
603 self.statistics.count
604 }
605}
606
607#[derive(Debug, Clone)]
609pub struct IncrementalPoisson {
610 event_counts: Vec<usize>,
611 statistics: IncrementalStatistics,
612}
613
614impl Default for IncrementalPoisson {
615 fn default() -> Self {
616 Self::new()
617 }
618}
619
620impl IncrementalPoisson {
621 pub fn new() -> Self {
622 Self {
623 event_counts: Vec::new(),
624 statistics: IncrementalStatistics::new(),
625 }
626 }
627
628 pub fn add_count(&mut self, count: usize) {
629 let count_f64 = count as f64;
630 self.event_counts.push(count);
631 self.statistics.add(count_f64);
632 }
633
634 pub fn add_counts(&mut self, counts: &[usize]) {
635 for &count in counts {
636 self.add_count(count);
637 }
638 }
639
640 pub fn merge(&mut self, other: &IncrementalPoisson) {
641 self.event_counts.extend_from_slice(&other.event_counts);
642 self.statistics.merge(&other.statistics);
643 }
644
645 pub fn statistics(&self) -> &IncrementalStatistics {
646 &self.statistics
647 }
648
649 pub fn event_counts(&self) -> &[usize] {
650 &self.event_counts
651 }
652
653 pub fn count(&self) -> usize {
654 self.event_counts.len()
655 }
656}
657
658pub fn streaming_pareto_analysis<I>(
660 data_iter: I,
661 config: &MemoryConfig,
662) -> Result<ChunkAnalysisResult<IncrementalPareto>>
663where
664 I: Iterator<Item = f64>,
665{
666 let start_time = std::time::Instant::now();
667 let mut processor = StreamingProcessor::new(config);
668 let mut pareto = IncrementalPareto::new();
669 let mut chunks_processed = 0;
670
671 for value in data_iter {
672 if let Some(chunk) = processor.push(value) {
673 let mut chunk_pareto = IncrementalPareto::new();
674 chunk_pareto.add_batch(&chunk);
675 pareto.merge(&chunk_pareto);
676 chunks_processed += 1;
677 }
678 }
679
680 let mut total_processed = processor.processed_count();
681
682 if let Some(remaining) = processor.finish() {
683 total_processed += remaining.len();
684 let mut chunk_pareto = IncrementalPareto::new();
685 chunk_pareto.add_batch(&remaining);
686 pareto.merge(&chunk_pareto);
687 chunks_processed += 1;
688 }
689
690 let memory_used_mb = (total_processed * std::mem::size_of::<f64>()) as f64 / 1024.0 / 1024.0;
691
692 Ok(ChunkAnalysisResult {
693 chunks_processed,
694 total_items: total_processed,
695 memory_used_mb,
696 processing_time_ms: start_time.elapsed().as_millis() as u64,
697 result: pareto,
698 })
699}
700
701pub fn streaming_normal_analysis<I>(
703 data_iter: I,
704 config: &MemoryConfig,
705) -> Result<ChunkAnalysisResult<IncrementalNormal>>
706where
707 I: Iterator<Item = f64>,
708{
709 let start_time = std::time::Instant::now();
710 let mut processor = StreamingProcessor::new(config);
711 let mut normal = IncrementalNormal::new();
712 let mut chunks_processed = 0;
713
714 for value in data_iter {
715 if let Some(chunk) = processor.push(value) {
716 let mut chunk_normal = IncrementalNormal::new();
717 chunk_normal.add_batch(&chunk);
718 normal.merge(&chunk_normal);
719 chunks_processed += 1;
720 }
721 }
722
723 let mut total_processed = processor.processed_count();
724
725 if let Some(remaining) = processor.finish() {
726 total_processed += remaining.len();
727 let mut chunk_normal = IncrementalNormal::new();
728 chunk_normal.add_batch(&remaining);
729 normal.merge(&chunk_normal);
730 chunks_processed += 1;
731 }
732
733 let memory_used_mb = (total_processed * std::mem::size_of::<f64>()) as f64 / 1024.0 / 1024.0;
734
735 Ok(ChunkAnalysisResult {
736 chunks_processed,
737 total_items: total_processed,
738 memory_used_mb,
739 processing_time_ms: start_time.elapsed().as_millis() as u64,
740 result: normal,
741 })
742}
743
744pub fn streaming_poisson_analysis<I>(
746 data_iter: I,
747 config: &MemoryConfig,
748) -> Result<ChunkAnalysisResult<IncrementalPoisson>>
749where
750 I: Iterator<Item = usize>,
751{
752 let start_time = std::time::Instant::now();
753 let mut processor = StreamingProcessor::new(config);
754 let mut poisson = IncrementalPoisson::new();
755 let mut chunks_processed = 0;
756
757 for value in data_iter {
758 if let Some(chunk) = processor.push(value) {
759 let mut chunk_poisson = IncrementalPoisson::new();
760 chunk_poisson.add_counts(&chunk);
761 poisson.merge(&chunk_poisson);
762 chunks_processed += 1;
763 }
764 }
765
766 let mut total_processed = processor.processed_count();
767
768 if let Some(remaining) = processor.finish() {
769 total_processed += remaining.len();
770 let mut chunk_poisson = IncrementalPoisson::new();
771 chunk_poisson.add_counts(&remaining);
772 poisson.merge(&chunk_poisson);
773 chunks_processed += 1;
774 }
775
776 let memory_used_mb = (total_processed * std::mem::size_of::<usize>()) as f64 / 1024.0 / 1024.0;
777
778 Ok(ChunkAnalysisResult {
779 chunks_processed,
780 total_items: total_processed,
781 memory_used_mb,
782 processing_time_ms: start_time.elapsed().as_millis() as u64,
783 result: poisson,
784 })
785}
786
787pub fn streaming_zipf_analysis<I>(
789 data_iter: I,
790 config: &MemoryConfig,
791) -> Result<ChunkAnalysisResult<IncrementalZipf>>
792where
793 I: Iterator<Item = String>,
794{
795 let start_time = std::time::Instant::now();
796 let mut processor = StreamingProcessor::new(config);
797 let mut zipf = IncrementalZipf::new();
798 let mut chunks_processed = 0;
799
800 for word in data_iter {
801 if let Some(chunk) = processor.push(word) {
802 let mut chunk_zipf = IncrementalZipf::new();
803 chunk_zipf.add_words(&chunk);
804 zipf.merge(&chunk_zipf);
805 chunks_processed += 1;
806 }
807 }
808
809 let mut total_processed = processor.processed_count();
810
811 if let Some(remaining) = processor.finish() {
812 total_processed += remaining.len();
813 let mut chunk_zipf = IncrementalZipf::new();
814 chunk_zipf.add_words(&remaining);
815 zipf.merge(&chunk_zipf);
816 chunks_processed += 1;
817 }
818
819 let memory_used_mb = (total_processed * 20) as f64 / 1024.0 / 1024.0;
821
822 Ok(ChunkAnalysisResult {
823 chunks_processed,
824 total_items: total_processed,
825 memory_used_mb,
826 processing_time_ms: start_time.elapsed().as_millis() as u64,
827 result: zipf,
828 })
829}
830
831#[derive(Debug, Clone)]
833pub struct ResourceMonitor {
834 peak_memory_mb: f64,
835 current_memory_mb: f64,
836 allocation_count: usize,
837}
838
839impl Default for ResourceMonitor {
840 fn default() -> Self {
841 Self {
842 peak_memory_mb: 0.0,
843 current_memory_mb: 0.0,
844 allocation_count: 0,
845 }
846 }
847}
848
849impl ResourceMonitor {
850 pub fn new() -> Self {
851 Self::default()
852 }
853
854 pub fn record_allocation(&mut self, size_bytes: usize) {
856 let size_mb = size_bytes as f64 / 1024.0 / 1024.0;
857 self.current_memory_mb += size_mb;
858 self.peak_memory_mb = self.peak_memory_mb.max(self.current_memory_mb);
859 self.allocation_count += 1;
860 }
861
862 pub fn record_deallocation(&mut self, size_bytes: usize) {
864 let size_mb = size_bytes as f64 / 1024.0 / 1024.0;
865 self.current_memory_mb -= size_mb;
866 self.current_memory_mb = self.current_memory_mb.max(0.0);
867 }
868
869 pub fn peak_memory_mb(&self) -> f64 {
871 self.peak_memory_mb
872 }
873
874 pub fn current_memory_mb(&self) -> f64 {
876 self.current_memory_mb
877 }
878
879 pub fn allocation_count(&self) -> usize {
881 self.allocation_count
882 }
883}
884
885fn get_first_digit(value: f64) -> usize {
887 let mut n = value;
888 while n >= 10.0 {
889 n /= 10.0;
890 }
891 n as usize
892}
893
894#[cfg(test)]
896mod tests {
897 use super::*;
898
899 #[test]
900 fn test_incremental_statistics() {
901 let mut stats = IncrementalStatistics::new();
902 let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
903
904 stats.add_batch(&data);
905
906 assert_eq!(stats.count(), 5);
907 assert!((stats.mean() - 3.0).abs() < 1e-10);
908 assert!((stats.variance() - 2.5).abs() < 1e-10);
909 }
910
911 #[test]
912 fn test_incremental_benford() {
913 let mut benford = IncrementalBenford::new();
914 let data = vec![100.0, 200.0, 300.0, 111.0, 222.0];
915
916 benford.add_batch(&data);
917
918 assert_eq!(benford.total_count(), 5);
919 let distribution = benford.get_distribution();
920 assert!(distribution[0] > 0.0); assert!(distribution[1] > 0.0); }
923
924 #[test]
925 fn test_streaming_processor() {
926 let config = MemoryConfig {
927 chunk_size: 3,
928 max_memory_mb: 100,
929 enable_streaming: true,
930 enable_compression: false,
931 };
932
933 let mut processor = StreamingProcessor::new(&config);
934
935 assert!(processor.push(1.0).is_none()); assert!(processor.push(2.0).is_none()); let chunk = processor.push(3.0); assert!(chunk.is_some());
941 let chunk = chunk.unwrap();
942 assert_eq!(chunk.len(), 3);
943 assert_eq!(chunk, vec![1.0, 2.0, 3.0]);
944
945 assert_eq!(processor.processed_count(), 3);
947
948 assert!(processor.push(4.0).is_none());
950 assert!(processor.push(5.0).is_none());
951
952 let remaining = processor.finish();
954 assert!(remaining.is_some());
955 let remaining = remaining.unwrap();
956 assert_eq!(remaining, vec![4.0, 5.0]);
957 }
958
959 #[test]
960 fn test_chunk_iterator() {
961 let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0];
962 let mut iterator = ChunkIterator::new(data, 3);
963
964 let chunk1 = iterator.next().unwrap();
965 assert_eq!(chunk1, vec![1.0, 2.0, 3.0]);
966
967 let chunk2 = iterator.next().unwrap();
968 assert_eq!(chunk2, vec![4.0, 5.0, 6.0]);
969
970 let chunk3 = iterator.next().unwrap();
971 assert_eq!(chunk3, vec![7.0]);
972
973 assert!(iterator.next().is_none());
974 }
975
976 #[test]
977 fn test_incremental_statistics_merge() {
978 let mut stats1 = IncrementalStatistics::new();
979 let mut stats2 = IncrementalStatistics::new();
980
981 stats1.add_batch(&[1.0, 2.0, 3.0]);
982 stats2.add_batch(&[4.0, 5.0, 6.0]);
983
984 stats1.merge(&stats2);
985
986 assert_eq!(stats1.count(), 6);
988 assert!((stats1.mean() - 3.5).abs() < 1e-10); assert!(stats1.variance() > 0.0);
990 }
991
992 #[test]
993 fn test_incremental_benford_merge() {
994 let mut benford1 = IncrementalBenford::new();
995 let mut benford2 = IncrementalBenford::new();
996
997 benford1.add_batch(&[100.0, 200.0]);
998 benford2.add_batch(&[300.0, 111.0]);
999
1000 let count1 = benford1.total_count();
1001 let count2 = benford2.total_count();
1002
1003 benford1.merge(&benford2);
1004
1005 assert_eq!(benford1.total_count(), count1 + count2);
1006 assert!(benford1.calculate_mad() >= 0.0);
1007 }
1008
1009 #[test]
1010 fn test_streaming_benford_analysis() {
1011 let config = MemoryConfig {
1012 chunk_size: 3, max_memory_mb: 100,
1014 enable_streaming: true,
1015 enable_compression: false,
1016 };
1017 let data = vec![100.0, 200.0, 300.0, 111.0, 222.0, 333.0, 444.0];
1018
1019 let result = streaming_benford_analysis(data.into_iter(), &config).unwrap();
1020
1021 assert!(result.chunks_processed >= 1);
1022 assert!(result.total_items > 0); assert!(result.memory_used_mb > 0.0);
1024 assert!(result.result.total_count() > 0);
1025 }
1026
1027 #[test]
1028 fn test_streaming_statistics_analysis() {
1029 let config = MemoryConfig {
1030 chunk_size: 4, max_memory_mb: 100,
1032 enable_streaming: true,
1033 enable_compression: false,
1034 };
1035 let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
1036
1037 let result = streaming_statistics_analysis(data.into_iter(), &config).unwrap();
1038
1039 assert!(result.chunks_processed >= 1);
1040 assert!(result.total_items > 0); assert!(result.memory_used_mb > 0.0);
1042 assert!(result.result.count() > 0); }
1045
1046 #[test]
1047 fn test_resource_monitor() {
1048 let mut monitor = ResourceMonitor::new();
1049
1050 assert_eq!(monitor.peak_memory_mb(), 0.0);
1051 assert_eq!(monitor.current_memory_mb(), 0.0);
1052 assert_eq!(monitor.allocation_count(), 0);
1053
1054 monitor.record_allocation(1024 * 1024); assert_eq!(monitor.allocation_count(), 1);
1056 assert!(monitor.current_memory_mb() > 0.0);
1057 assert!(monitor.peak_memory_mb() > 0.0);
1058
1059 monitor.record_deallocation(512 * 1024); assert!(monitor.current_memory_mb() > 0.0);
1061 assert!(monitor.current_memory_mb() < monitor.peak_memory_mb());
1062 }
1063}