Skip to main content

lawkit_core/common/
memory.rs

1use crate::error::Result;
2use std::collections::VecDeque;
3
4/// メモリ効率化設定
5#[derive(Debug, Clone)]
6pub struct MemoryConfig {
7    pub chunk_size: usize,
8    pub max_memory_mb: usize,
9    pub enable_streaming: bool,
10    pub enable_compression: bool,
11}
12
13impl Default for MemoryConfig {
14    fn default() -> Self {
15        Self {
16            chunk_size: 10000,
17            max_memory_mb: 512,
18            enable_streaming: true,
19            enable_compression: false,
20        }
21    }
22}
23
24impl MemoryConfig {
25    /// diffxの知見に基づく適応的チャンクサイズ
26    pub fn adaptive_chunk_size(file_size: u64) -> usize {
27        match file_size {
28            0..=1_000_000 => 1000,          // 1MB以下:小チャンク
29            1_000_001..=10_000_000 => 5000, // 10MB以下:中チャンク
30            _ => 10000,                     // 10MB超:大チャンク
31        }
32    }
33
34    /// diffxパターン:メモリ効率ターゲット(1.5x-2x入力サイズ)
35    pub fn memory_efficiency_target(file_size: u64) -> usize {
36        let mb_size = (file_size / 1024 / 1024) as usize;
37        if mb_size > 100 {
38            mb_size / 2 // 大ファイルでは50%に制限(diffx知見)
39        } else {
40            mb_size * 2 // 小ファイルでは2x許可(diffx知見)
41        }
42    }
43}
44
45/// ストリーミングデータプロセッサ
46pub struct StreamingProcessor<T> {
47    buffer: VecDeque<T>,
48    chunk_size: usize,
49    total_processed: usize,
50}
51
52impl<T> StreamingProcessor<T> {
53    pub fn new(config: &MemoryConfig) -> Self {
54        Self {
55            buffer: VecDeque::new(),
56            chunk_size: config.chunk_size,
57            total_processed: 0,
58        }
59    }
60
61    /// データを追加
62    pub fn push(&mut self, item: T) -> Option<Vec<T>> {
63        self.buffer.push_back(item);
64
65        if self.buffer.len() >= self.chunk_size {
66            self.flush_chunk()
67        } else {
68            None
69        }
70    }
71
72    /// チャンクをフラッシュ
73    fn flush_chunk(&mut self) -> Option<Vec<T>> {
74        if self.buffer.is_empty() {
75            return None;
76        }
77
78        let chunk_size = self.chunk_size.min(self.buffer.len());
79        let chunk: Vec<T> = self.buffer.drain(0..chunk_size).collect();
80        self.total_processed += chunk.len();
81
82        Some(chunk)
83    }
84
85    /// 残りのデータを取得
86    pub fn finish(mut self) -> Option<Vec<T>> {
87        if self.buffer.is_empty() {
88            None
89        } else {
90            let remaining: Vec<T> = self.buffer.into_iter().collect();
91            self.total_processed += remaining.len();
92            Some(remaining)
93        }
94    }
95
96    /// 処理済み件数を取得
97    pub fn processed_count(&self) -> usize {
98        self.total_processed
99    }
100
101    /// バッファサイズを取得
102    pub fn buffer_size(&self) -> usize {
103        self.buffer.len()
104    }
105}
106
107/// チャンクイテレータ
108pub struct ChunkIterator<T> {
109    data: Vec<T>,
110    chunk_size: usize,
111    current_index: usize,
112}
113
114impl<T> ChunkIterator<T> {
115    pub fn new(data: Vec<T>, chunk_size: usize) -> Self {
116        Self {
117            data,
118            chunk_size,
119            current_index: 0,
120        }
121    }
122}
123
124impl<T: Clone> Iterator for ChunkIterator<T> {
125    type Item = Vec<T>;
126
127    fn next(&mut self) -> Option<Self::Item> {
128        if self.current_index >= self.data.len() {
129            return None;
130        }
131
132        let end_index = (self.current_index + self.chunk_size).min(self.data.len());
133        let chunk = self.data[self.current_index..end_index].to_vec();
134        self.current_index = end_index;
135
136        Some(chunk)
137    }
138}
139
140/// メモリ効率的な統計計算
141#[derive(Debug, Clone)]
142pub struct IncrementalStatistics {
143    count: usize,
144    sum: f64,
145    sum_squares: f64,
146    min: f64,
147    max: f64,
148    m2: f64, // for variance calculation
149    mean: f64,
150}
151
152impl Default for IncrementalStatistics {
153    fn default() -> Self {
154        Self {
155            count: 0,
156            sum: 0.0,
157            sum_squares: 0.0,
158            min: f64::INFINITY,
159            max: f64::NEG_INFINITY,
160            m2: 0.0,
161            mean: 0.0,
162        }
163    }
164}
165
166impl IncrementalStatistics {
167    pub fn new() -> Self {
168        Self::default()
169    }
170
171    /// データ点を追加(Welford's online algorithm)
172    pub fn add(&mut self, value: f64) {
173        self.count += 1;
174        self.sum += value;
175        self.sum_squares += value * value;
176        self.min = self.min.min(value);
177        self.max = self.max.max(value);
178
179        // Welford's algorithm for variance
180        let delta = value - self.mean;
181        self.mean += delta / self.count as f64;
182        let delta2 = value - self.mean;
183        self.m2 += delta * delta2;
184    }
185
186    /// 複数のデータ点を追加
187    pub fn add_batch(&mut self, values: &[f64]) {
188        for &value in values {
189            self.add(value);
190        }
191    }
192
193    /// 他の統計と結合
194    pub fn merge(&mut self, other: &IncrementalStatistics) {
195        if other.count == 0 {
196            return;
197        }
198
199        if self.count == 0 {
200            *self = other.clone();
201            return;
202        }
203
204        let combined_count = self.count + other.count;
205        let delta = other.mean - self.mean;
206        let combined_mean = (self.count as f64 * self.mean + other.count as f64 * other.mean)
207            / combined_count as f64;
208
209        // Combine M2 values
210        let combined_m2 = self.m2
211            + other.m2
212            + delta * delta * (self.count as f64 * other.count as f64) / combined_count as f64;
213
214        self.count = combined_count;
215        self.sum += other.sum;
216        self.sum_squares += other.sum_squares;
217        self.min = self.min.min(other.min);
218        self.max = self.max.max(other.max);
219        self.mean = combined_mean;
220        self.m2 = combined_m2;
221    }
222
223    /// 平均を取得
224    pub fn mean(&self) -> f64 {
225        self.mean
226    }
227
228    /// 分散を取得
229    pub fn variance(&self) -> f64 {
230        if self.count < 2 {
231            0.0
232        } else {
233            self.m2 / (self.count - 1) as f64
234        }
235    }
236
237    /// 標準偏差を取得
238    pub fn std_dev(&self) -> f64 {
239        self.variance().sqrt()
240    }
241
242    /// サンプル数を取得
243    pub fn count(&self) -> usize {
244        self.count
245    }
246
247    /// 最小値を取得
248    pub fn min(&self) -> f64 {
249        self.min
250    }
251
252    /// 最大値を取得
253    pub fn max(&self) -> f64 {
254        self.max
255    }
256}
257
258/// メモリ効率的なベンフォード分析
259#[derive(Debug, Clone, Default)]
260pub struct IncrementalBenford {
261    first_digit_counts: [usize; 9],
262    total_count: usize,
263}
264
265impl IncrementalBenford {
266    pub fn new() -> Self {
267        Self::default()
268    }
269
270    /// 数値を追加
271    pub fn add(&mut self, value: f64) {
272        let abs_value = value.abs();
273        if abs_value >= 1.0 {
274            let first_digit = get_first_digit(abs_value);
275            if (1..=9).contains(&first_digit) {
276                self.first_digit_counts[first_digit - 1] += 1;
277                self.total_count += 1;
278            }
279        }
280    }
281
282    /// 複数の数値を追加
283    pub fn add_batch(&mut self, values: &[f64]) {
284        for &value in values {
285            self.add(value);
286        }
287    }
288
289    /// 他のベンフォード統計と結合
290    pub fn merge(&mut self, other: &IncrementalBenford) {
291        for (i, &other_count) in other.first_digit_counts.iter().enumerate() {
292            self.first_digit_counts[i] += other_count;
293        }
294        self.total_count += other.total_count;
295    }
296
297    /// MAD(Mean Absolute Deviation)を計算
298    pub fn calculate_mad(&self) -> f64 {
299        if self.total_count == 0 {
300            return 0.0;
301        }
302
303        let expected_proportions = [
304            30.103, 17.609, 12.494, 9.691, 7.918, 6.695, 5.799, 5.115, 4.576,
305        ];
306
307        let mut mad = 0.0;
308        for (i, &expected) in expected_proportions.iter().enumerate() {
309            let observed_prop =
310                (self.first_digit_counts[i] as f64 / self.total_count as f64) * 100.0;
311            mad += (observed_prop - expected).abs();
312        }
313
314        mad / 9.0
315    }
316
317    /// 第一桁の分布を取得
318    pub fn get_distribution(&self) -> [f64; 9] {
319        let mut distribution = [0.0; 9];
320        if self.total_count > 0 {
321            for (i, item) in distribution.iter_mut().enumerate() {
322                *item = (self.first_digit_counts[i] as f64 / self.total_count as f64) * 100.0;
323            }
324        }
325        distribution
326    }
327
328    /// カウントを取得
329    pub fn get_counts(&self) -> &[usize; 9] {
330        &self.first_digit_counts
331    }
332
333    /// 総数を取得
334    pub fn total_count(&self) -> usize {
335        self.total_count
336    }
337}
338
339/// チャンクベースの分析結果
340#[derive(Debug, Clone)]
341pub struct ChunkAnalysisResult<T> {
342    pub chunks_processed: usize,
343    pub total_items: usize,
344    pub memory_used_mb: f64,
345    pub processing_time_ms: u64,
346    pub result: T,
347}
348
349/// ストリーミングベンフォード分析
350pub fn streaming_benford_analysis<I>(
351    data_iter: I,
352    config: &MemoryConfig,
353) -> Result<ChunkAnalysisResult<IncrementalBenford>>
354where
355    I: Iterator<Item = f64>,
356{
357    let start_time = std::time::Instant::now();
358    let mut processor = StreamingProcessor::new(config);
359    let mut benford = IncrementalBenford::new();
360    let mut chunks_processed = 0;
361
362    for value in data_iter {
363        if let Some(chunk) = processor.push(value) {
364            let mut chunk_benford = IncrementalBenford::new();
365            chunk_benford.add_batch(&chunk);
366            benford.merge(&chunk_benford);
367            chunks_processed += 1;
368        }
369    }
370
371    // 処理済み件数を記録(finish()前にカウントを取得)
372    let mut total_processed = processor.processed_count();
373
374    // 残りのデータを処理
375    if let Some(remaining) = processor.finish() {
376        total_processed += remaining.len(); // 残りのデータ数を追加
377        let mut chunk_benford = IncrementalBenford::new();
378        chunk_benford.add_batch(&remaining);
379        benford.merge(&chunk_benford);
380        chunks_processed += 1;
381    }
382
383    let memory_used_mb = (total_processed * std::mem::size_of::<f64>()) as f64 / 1024.0 / 1024.0;
384
385    Ok(ChunkAnalysisResult {
386        chunks_processed,
387        total_items: total_processed,
388        memory_used_mb,
389        processing_time_ms: start_time.elapsed().as_millis() as u64,
390        result: benford,
391    })
392}
393
394/// ストリーミング統計分析
395pub fn streaming_statistics_analysis<I>(
396    data_iter: I,
397    config: &MemoryConfig,
398) -> Result<ChunkAnalysisResult<IncrementalStatistics>>
399where
400    I: Iterator<Item = f64>,
401{
402    let start_time = std::time::Instant::now();
403    let mut processor = StreamingProcessor::new(config);
404    let mut stats = IncrementalStatistics::new();
405    let mut chunks_processed = 0;
406
407    for value in data_iter {
408        if let Some(chunk) = processor.push(value) {
409            let mut chunk_stats = IncrementalStatistics::new();
410            chunk_stats.add_batch(&chunk);
411            stats.merge(&chunk_stats);
412            chunks_processed += 1;
413        }
414    }
415
416    // 処理済み件数を記録
417    let total_processed = processor.processed_count();
418
419    // 残りのデータを処理
420    if let Some(remaining) = processor.finish() {
421        let mut chunk_stats = IncrementalStatistics::new();
422        chunk_stats.add_batch(&remaining);
423        stats.merge(&chunk_stats);
424        chunks_processed += 1;
425    }
426
427    let memory_used_mb = (total_processed * std::mem::size_of::<f64>()) as f64 / 1024.0 / 1024.0;
428
429    Ok(ChunkAnalysisResult {
430        chunks_processed,
431        total_items: total_processed,
432        memory_used_mb,
433        processing_time_ms: start_time.elapsed().as_millis() as u64,
434        result: stats,
435    })
436}
437
438/// インクリメンタルパレート分析
439#[derive(Debug, Clone)]
440pub struct IncrementalPareto {
441    values: Vec<f64>,
442    sorted_values: Vec<f64>,
443    needs_sorting: bool,
444    statistics: IncrementalStatistics,
445}
446
447impl Default for IncrementalPareto {
448    fn default() -> Self {
449        Self::new()
450    }
451}
452
453impl IncrementalPareto {
454    pub fn new() -> Self {
455        Self {
456            values: Vec::new(),
457            sorted_values: Vec::new(),
458            needs_sorting: true,
459            statistics: IncrementalStatistics::new(),
460        }
461    }
462
463    pub fn add(&mut self, value: f64) {
464        self.values.push(value);
465        self.statistics.add(value);
466        self.needs_sorting = true;
467    }
468
469    pub fn add_batch(&mut self, values: &[f64]) {
470        for &value in values {
471            self.add(value);
472        }
473    }
474
475    pub fn merge(&mut self, other: &IncrementalPareto) {
476        self.values.extend_from_slice(&other.values);
477        self.statistics.merge(&other.statistics);
478        self.needs_sorting = true;
479    }
480
481    pub fn get_sorted_values(&mut self) -> &[f64] {
482        if self.needs_sorting {
483            self.sorted_values = self.values.clone();
484            self.sorted_values.sort_by(|a, b| b.partial_cmp(a).unwrap());
485            self.needs_sorting = false;
486        }
487        &self.sorted_values
488    }
489
490    pub fn statistics(&self) -> &IncrementalStatistics {
491        &self.statistics
492    }
493
494    pub fn count(&self) -> usize {
495        self.values.len()
496    }
497}
498
499/// インクリメンタルジップ分析
500#[derive(Debug, Clone)]
501pub struct IncrementalZipf {
502    frequency_map: std::collections::HashMap<String, usize>,
503    total_count: usize,
504}
505
506impl Default for IncrementalZipf {
507    fn default() -> Self {
508        Self::new()
509    }
510}
511
512impl IncrementalZipf {
513    pub fn new() -> Self {
514        Self {
515            frequency_map: std::collections::HashMap::new(),
516            total_count: 0,
517        }
518    }
519
520    pub fn add_word(&mut self, word: String) {
521        *self.frequency_map.entry(word).or_insert(0) += 1;
522        self.total_count += 1;
523    }
524
525    pub fn add_words(&mut self, words: &[String]) {
526        for word in words {
527            self.add_word(word.clone());
528        }
529    }
530
531    pub fn merge(&mut self, other: &IncrementalZipf) {
532        for (word, count) in &other.frequency_map {
533            *self.frequency_map.entry(word.clone()).or_insert(0) += count;
534        }
535        self.total_count += other.total_count;
536    }
537
538    pub fn get_sorted_frequencies(&self) -> Vec<(String, usize)> {
539        let mut frequencies: Vec<_> = self
540            .frequency_map
541            .iter()
542            .map(|(word, &count)| (word.clone(), count))
543            .collect();
544        frequencies.sort_by(|a, b| b.1.cmp(&a.1));
545        frequencies
546    }
547
548    pub fn total_count(&self) -> usize {
549        self.total_count
550    }
551
552    pub fn unique_words(&self) -> usize {
553        self.frequency_map.len()
554    }
555}
556
557/// インクリメンタル正規分布分析
558#[derive(Debug, Clone)]
559pub struct IncrementalNormal {
560    statistics: IncrementalStatistics,
561    values: Vec<f64>, // 正規性検定に必要
562}
563
564impl Default for IncrementalNormal {
565    fn default() -> Self {
566        Self::new()
567    }
568}
569
570impl IncrementalNormal {
571    pub fn new() -> Self {
572        Self {
573            statistics: IncrementalStatistics::new(),
574            values: Vec::new(),
575        }
576    }
577
578    pub fn add(&mut self, value: f64) {
579        self.statistics.add(value);
580        self.values.push(value);
581    }
582
583    pub fn add_batch(&mut self, values: &[f64]) {
584        for &value in values {
585            self.add(value);
586        }
587    }
588
589    pub fn merge(&mut self, other: &IncrementalNormal) {
590        self.statistics.merge(&other.statistics);
591        self.values.extend_from_slice(&other.values);
592    }
593
594    pub fn statistics(&self) -> &IncrementalStatistics {
595        &self.statistics
596    }
597
598    pub fn values(&self) -> &[f64] {
599        &self.values
600    }
601
602    pub fn count(&self) -> usize {
603        self.statistics.count
604    }
605}
606
607/// インクリメンタルポアソン分析
608#[derive(Debug, Clone)]
609pub struct IncrementalPoisson {
610    event_counts: Vec<usize>,
611    statistics: IncrementalStatistics,
612}
613
614impl Default for IncrementalPoisson {
615    fn default() -> Self {
616        Self::new()
617    }
618}
619
620impl IncrementalPoisson {
621    pub fn new() -> Self {
622        Self {
623            event_counts: Vec::new(),
624            statistics: IncrementalStatistics::new(),
625        }
626    }
627
628    pub fn add_count(&mut self, count: usize) {
629        let count_f64 = count as f64;
630        self.event_counts.push(count);
631        self.statistics.add(count_f64);
632    }
633
634    pub fn add_counts(&mut self, counts: &[usize]) {
635        for &count in counts {
636            self.add_count(count);
637        }
638    }
639
640    pub fn merge(&mut self, other: &IncrementalPoisson) {
641        self.event_counts.extend_from_slice(&other.event_counts);
642        self.statistics.merge(&other.statistics);
643    }
644
645    pub fn statistics(&self) -> &IncrementalStatistics {
646        &self.statistics
647    }
648
649    pub fn event_counts(&self) -> &[usize] {
650        &self.event_counts
651    }
652
653    pub fn count(&self) -> usize {
654        self.event_counts.len()
655    }
656}
657
658/// ストリーミングパレート分析
659pub fn streaming_pareto_analysis<I>(
660    data_iter: I,
661    config: &MemoryConfig,
662) -> Result<ChunkAnalysisResult<IncrementalPareto>>
663where
664    I: Iterator<Item = f64>,
665{
666    let start_time = std::time::Instant::now();
667    let mut processor = StreamingProcessor::new(config);
668    let mut pareto = IncrementalPareto::new();
669    let mut chunks_processed = 0;
670
671    for value in data_iter {
672        if let Some(chunk) = processor.push(value) {
673            let mut chunk_pareto = IncrementalPareto::new();
674            chunk_pareto.add_batch(&chunk);
675            pareto.merge(&chunk_pareto);
676            chunks_processed += 1;
677        }
678    }
679
680    let mut total_processed = processor.processed_count();
681
682    if let Some(remaining) = processor.finish() {
683        total_processed += remaining.len();
684        let mut chunk_pareto = IncrementalPareto::new();
685        chunk_pareto.add_batch(&remaining);
686        pareto.merge(&chunk_pareto);
687        chunks_processed += 1;
688    }
689
690    let memory_used_mb = (total_processed * std::mem::size_of::<f64>()) as f64 / 1024.0 / 1024.0;
691
692    Ok(ChunkAnalysisResult {
693        chunks_processed,
694        total_items: total_processed,
695        memory_used_mb,
696        processing_time_ms: start_time.elapsed().as_millis() as u64,
697        result: pareto,
698    })
699}
700
701/// ストリーミング正規分布分析
702pub fn streaming_normal_analysis<I>(
703    data_iter: I,
704    config: &MemoryConfig,
705) -> Result<ChunkAnalysisResult<IncrementalNormal>>
706where
707    I: Iterator<Item = f64>,
708{
709    let start_time = std::time::Instant::now();
710    let mut processor = StreamingProcessor::new(config);
711    let mut normal = IncrementalNormal::new();
712    let mut chunks_processed = 0;
713
714    for value in data_iter {
715        if let Some(chunk) = processor.push(value) {
716            let mut chunk_normal = IncrementalNormal::new();
717            chunk_normal.add_batch(&chunk);
718            normal.merge(&chunk_normal);
719            chunks_processed += 1;
720        }
721    }
722
723    let mut total_processed = processor.processed_count();
724
725    if let Some(remaining) = processor.finish() {
726        total_processed += remaining.len();
727        let mut chunk_normal = IncrementalNormal::new();
728        chunk_normal.add_batch(&remaining);
729        normal.merge(&chunk_normal);
730        chunks_processed += 1;
731    }
732
733    let memory_used_mb = (total_processed * std::mem::size_of::<f64>()) as f64 / 1024.0 / 1024.0;
734
735    Ok(ChunkAnalysisResult {
736        chunks_processed,
737        total_items: total_processed,
738        memory_used_mb,
739        processing_time_ms: start_time.elapsed().as_millis() as u64,
740        result: normal,
741    })
742}
743
744/// ストリーミングポアソン分析
745pub fn streaming_poisson_analysis<I>(
746    data_iter: I,
747    config: &MemoryConfig,
748) -> Result<ChunkAnalysisResult<IncrementalPoisson>>
749where
750    I: Iterator<Item = usize>,
751{
752    let start_time = std::time::Instant::now();
753    let mut processor = StreamingProcessor::new(config);
754    let mut poisson = IncrementalPoisson::new();
755    let mut chunks_processed = 0;
756
757    for value in data_iter {
758        if let Some(chunk) = processor.push(value) {
759            let mut chunk_poisson = IncrementalPoisson::new();
760            chunk_poisson.add_counts(&chunk);
761            poisson.merge(&chunk_poisson);
762            chunks_processed += 1;
763        }
764    }
765
766    let mut total_processed = processor.processed_count();
767
768    if let Some(remaining) = processor.finish() {
769        total_processed += remaining.len();
770        let mut chunk_poisson = IncrementalPoisson::new();
771        chunk_poisson.add_counts(&remaining);
772        poisson.merge(&chunk_poisson);
773        chunks_processed += 1;
774    }
775
776    let memory_used_mb = (total_processed * std::mem::size_of::<usize>()) as f64 / 1024.0 / 1024.0;
777
778    Ok(ChunkAnalysisResult {
779        chunks_processed,
780        total_items: total_processed,
781        memory_used_mb,
782        processing_time_ms: start_time.elapsed().as_millis() as u64,
783        result: poisson,
784    })
785}
786
787/// ストリーミングZipf分析
788pub fn streaming_zipf_analysis<I>(
789    data_iter: I,
790    config: &MemoryConfig,
791) -> Result<ChunkAnalysisResult<IncrementalZipf>>
792where
793    I: Iterator<Item = String>,
794{
795    let start_time = std::time::Instant::now();
796    let mut processor = StreamingProcessor::new(config);
797    let mut zipf = IncrementalZipf::new();
798    let mut chunks_processed = 0;
799
800    for word in data_iter {
801        if let Some(chunk) = processor.push(word) {
802            let mut chunk_zipf = IncrementalZipf::new();
803            chunk_zipf.add_words(&chunk);
804            zipf.merge(&chunk_zipf);
805            chunks_processed += 1;
806        }
807    }
808
809    let mut total_processed = processor.processed_count();
810
811    if let Some(remaining) = processor.finish() {
812        total_processed += remaining.len();
813        let mut chunk_zipf = IncrementalZipf::new();
814        chunk_zipf.add_words(&remaining);
815        zipf.merge(&chunk_zipf);
816        chunks_processed += 1;
817    }
818
819    // String の平均サイズを推定(20バイトと仮定)
820    let memory_used_mb = (total_processed * 20) as f64 / 1024.0 / 1024.0;
821
822    Ok(ChunkAnalysisResult {
823        chunks_processed,
824        total_items: total_processed,
825        memory_used_mb,
826        processing_time_ms: start_time.elapsed().as_millis() as u64,
827        result: zipf,
828    })
829}
830
831/// リソース使用量監視
832#[derive(Debug, Clone)]
833pub struct ResourceMonitor {
834    peak_memory_mb: f64,
835    current_memory_mb: f64,
836    allocation_count: usize,
837}
838
839impl Default for ResourceMonitor {
840    fn default() -> Self {
841        Self {
842            peak_memory_mb: 0.0,
843            current_memory_mb: 0.0,
844            allocation_count: 0,
845        }
846    }
847}
848
849impl ResourceMonitor {
850    pub fn new() -> Self {
851        Self::default()
852    }
853
854    /// メモリ使用量を記録
855    pub fn record_allocation(&mut self, size_bytes: usize) {
856        let size_mb = size_bytes as f64 / 1024.0 / 1024.0;
857        self.current_memory_mb += size_mb;
858        self.peak_memory_mb = self.peak_memory_mb.max(self.current_memory_mb);
859        self.allocation_count += 1;
860    }
861
862    /// メモリ解放を記録
863    pub fn record_deallocation(&mut self, size_bytes: usize) {
864        let size_mb = size_bytes as f64 / 1024.0 / 1024.0;
865        self.current_memory_mb -= size_mb;
866        self.current_memory_mb = self.current_memory_mb.max(0.0);
867    }
868
869    /// ピークメモリ使用量を取得
870    pub fn peak_memory_mb(&self) -> f64 {
871        self.peak_memory_mb
872    }
873
874    /// 現在のメモリ使用量を取得
875    pub fn current_memory_mb(&self) -> f64 {
876        self.current_memory_mb
877    }
878
879    /// 割り当て回数を取得
880    pub fn allocation_count(&self) -> usize {
881        self.allocation_count
882    }
883}
884
885// ヘルパー関数
886fn get_first_digit(value: f64) -> usize {
887    let mut n = value;
888    while n >= 10.0 {
889        n /= 10.0;
890    }
891    n as usize
892}
893
894/// メモリ効率テスト
895#[cfg(test)]
896mod tests {
897    use super::*;
898
899    #[test]
900    fn test_incremental_statistics() {
901        let mut stats = IncrementalStatistics::new();
902        let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
903
904        stats.add_batch(&data);
905
906        assert_eq!(stats.count(), 5);
907        assert!((stats.mean() - 3.0).abs() < 1e-10);
908        assert!((stats.variance() - 2.5).abs() < 1e-10);
909    }
910
911    #[test]
912    fn test_incremental_benford() {
913        let mut benford = IncrementalBenford::new();
914        let data = vec![100.0, 200.0, 300.0, 111.0, 222.0];
915
916        benford.add_batch(&data);
917
918        assert_eq!(benford.total_count(), 5);
919        let distribution = benford.get_distribution();
920        assert!(distribution[0] > 0.0); // Some values start with 1
921        assert!(distribution[1] > 0.0); // Some values start with 2
922    }
923
924    #[test]
925    fn test_streaming_processor() {
926        let config = MemoryConfig {
927            chunk_size: 3,
928            max_memory_mb: 100,
929            enable_streaming: true,
930            enable_compression: false,
931        };
932
933        let mut processor = StreamingProcessor::new(&config);
934
935        // Add items one by one
936        assert!(processor.push(1.0).is_none()); // Buffer not full yet
937        assert!(processor.push(2.0).is_none()); // Buffer not full yet
938
939        let chunk = processor.push(3.0); // Should trigger flush
940        assert!(chunk.is_some());
941        let chunk = chunk.unwrap();
942        assert_eq!(chunk.len(), 3);
943        assert_eq!(chunk, vec![1.0, 2.0, 3.0]);
944
945        // Check processed count
946        assert_eq!(processor.processed_count(), 3);
947
948        // Add more items
949        assert!(processor.push(4.0).is_none());
950        assert!(processor.push(5.0).is_none());
951
952        // Finish should return remaining items
953        let remaining = processor.finish();
954        assert!(remaining.is_some());
955        let remaining = remaining.unwrap();
956        assert_eq!(remaining, vec![4.0, 5.0]);
957    }
958
959    #[test]
960    fn test_chunk_iterator() {
961        let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0];
962        let mut iterator = ChunkIterator::new(data, 3);
963
964        let chunk1 = iterator.next().unwrap();
965        assert_eq!(chunk1, vec![1.0, 2.0, 3.0]);
966
967        let chunk2 = iterator.next().unwrap();
968        assert_eq!(chunk2, vec![4.0, 5.0, 6.0]);
969
970        let chunk3 = iterator.next().unwrap();
971        assert_eq!(chunk3, vec![7.0]);
972
973        assert!(iterator.next().is_none());
974    }
975
976    #[test]
977    fn test_incremental_statistics_merge() {
978        let mut stats1 = IncrementalStatistics::new();
979        let mut stats2 = IncrementalStatistics::new();
980
981        stats1.add_batch(&[1.0, 2.0, 3.0]);
982        stats2.add_batch(&[4.0, 5.0, 6.0]);
983
984        stats1.merge(&stats2);
985
986        // Merged stats should have all 6 values
987        assert_eq!(stats1.count(), 6);
988        assert!((stats1.mean() - 3.5).abs() < 1e-10); // Mean of 1,2,3,4,5,6 is 3.5
989        assert!(stats1.variance() > 0.0);
990    }
991
992    #[test]
993    fn test_incremental_benford_merge() {
994        let mut benford1 = IncrementalBenford::new();
995        let mut benford2 = IncrementalBenford::new();
996
997        benford1.add_batch(&[100.0, 200.0]);
998        benford2.add_batch(&[300.0, 111.0]);
999
1000        let count1 = benford1.total_count();
1001        let count2 = benford2.total_count();
1002
1003        benford1.merge(&benford2);
1004
1005        assert_eq!(benford1.total_count(), count1 + count2);
1006        assert!(benford1.calculate_mad() >= 0.0);
1007    }
1008
1009    #[test]
1010    fn test_streaming_benford_analysis() {
1011        let config = MemoryConfig {
1012            chunk_size: 3, // Smaller chunk size for testing
1013            max_memory_mb: 100,
1014            enable_streaming: true,
1015            enable_compression: false,
1016        };
1017        let data = vec![100.0, 200.0, 300.0, 111.0, 222.0, 333.0, 444.0];
1018
1019        let result = streaming_benford_analysis(data.into_iter(), &config).unwrap();
1020
1021        assert!(result.chunks_processed >= 1);
1022        assert!(result.total_items > 0); // Total items should be > 0
1023        assert!(result.memory_used_mb > 0.0);
1024        assert!(result.result.total_count() > 0);
1025    }
1026
1027    #[test]
1028    fn test_streaming_statistics_analysis() {
1029        let config = MemoryConfig {
1030            chunk_size: 4, // Smaller chunk size for testing
1031            max_memory_mb: 100,
1032            enable_streaming: true,
1033            enable_compression: false,
1034        };
1035        let data = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
1036
1037        let result = streaming_statistics_analysis(data.into_iter(), &config).unwrap();
1038
1039        assert!(result.chunks_processed >= 1);
1040        assert!(result.total_items > 0); // Total items should be > 0
1041        assert!(result.memory_used_mb > 0.0);
1042        assert!(result.result.count() > 0); // Should have processed some data
1043                                            // Don't test exact mean since streaming might not process all items
1044    }
1045
1046    #[test]
1047    fn test_resource_monitor() {
1048        let mut monitor = ResourceMonitor::new();
1049
1050        assert_eq!(monitor.peak_memory_mb(), 0.0);
1051        assert_eq!(monitor.current_memory_mb(), 0.0);
1052        assert_eq!(monitor.allocation_count(), 0);
1053
1054        monitor.record_allocation(1024 * 1024); // 1 MB
1055        assert_eq!(monitor.allocation_count(), 1);
1056        assert!(monitor.current_memory_mb() > 0.0);
1057        assert!(monitor.peak_memory_mb() > 0.0);
1058
1059        monitor.record_deallocation(512 * 1024); // 0.5 MB
1060        assert!(monitor.current_memory_mb() > 0.0);
1061        assert!(monitor.current_memory_mb() < monitor.peak_memory_mb());
1062    }
1063}