Skip to main content

scirs2_stats/
memory_efficient.rs

1//! Memory-efficient statistical operations
2//!
3//! This module provides memory-optimized implementations of statistical functions
4//! that minimize allocations and use streaming/chunked processing for large datasets.
5
6use crate::error::{StatsError, StatsResult};
7use crate::error_standardization::ErrorMessages;
8#[cfg(feature = "memmap")]
9use memmap2::Mmap;
10use scirs2_core::ndarray::{s, ArrayBase, ArrayViewMut1, Data, Ix1, Ix2};
11use scirs2_core::numeric::{Float, NumCast};
12use std::cmp::Ordering;
13use std::fs::File;
14use std::io::{BufRead, BufReader, Read};
15use std::path::Path;
16
17/// Chunk size for streaming operations (tuned for cache efficiency)
18const CHUNK_SIZE: usize = 8192;
19
20/// Streaming mean calculation that processes data in chunks
21///
22/// This function computes the mean without loading the entire dataset into memory
23/// at once, making it suitable for very large datasets.
24///
25/// # Arguments
26///
27/// * `data_iter` - Iterator over data chunks
28/// * `total_count` - Total number of elements across all chunks
29///
30/// # Returns
31///
32/// The arithmetic mean
33#[allow(dead_code)]
34pub fn streaming_mean<F, I>(mut data_iter: I, totalcount: usize) -> StatsResult<F>
35where
36    F: Float + NumCast,
37    I: Iterator<Item = F> + std::fmt::Display,
38{
39    if totalcount == 0 {
40        return Err(ErrorMessages::empty_array("dataset"));
41    }
42
43    let mut sum = F::zero();
44    let mut _count = 0;
45
46    // Process in chunks to maintain precision
47    while _count < totalcount {
48        let chunk_sum = data_iter
49            .by_ref()
50            .take(CHUNK_SIZE)
51            .fold(F::zero(), |acc, val| acc + val);
52
53        sum = sum + chunk_sum;
54        _count += CHUNK_SIZE.min(totalcount - _count);
55    }
56
57    Ok(sum / F::from(totalcount).expect("Failed to convert to float"))
58}
59
60/// Welford's online algorithm for variance computation
61///
62/// This algorithm computes variance in a single pass with minimal memory usage
63/// and improved numerical stability.
64///
65/// # Arguments
66///
67/// * `x` - Input data array
68/// * `ddof` - Delta degrees of freedom
69///
70/// # Returns
71///
72/// * Tuple of (mean, variance)
73#[allow(dead_code)]
74pub fn welford_variance<F, D>(x: &ArrayBase<D, Ix1>, ddof: usize) -> StatsResult<(F, F)>
75where
76    F: Float + NumCast,
77    D: Data<Elem = F>,
78{
79    let n = x.len();
80    if n <= ddof {
81        return Err(StatsError::InvalidArgument(
82            "Not enough data points for the given degrees of freedom".to_string(),
83        ));
84    }
85
86    let mut mean = F::zero();
87    let mut m2 = F::zero();
88    let mut count = 0;
89
90    for &value in x.iter() {
91        count += 1;
92        let delta = value - mean;
93        mean = mean + delta / F::from(count).expect("Failed to convert to float");
94        let delta2 = value - mean;
95        m2 = m2 + delta * delta2;
96    }
97
98    let variance = m2 / F::from(n - ddof).expect("Failed to convert to float");
99    Ok((mean, variance))
100}
101
102/// In-place normalization (standardization) of data
103///
104/// This function normalizes data in-place to have zero mean and unit variance,
105/// avoiding the need for additional memory allocation.
106///
107/// # Arguments
108///
109/// * `data` - Mutable array to normalize
110/// * `ddof` - Delta degrees of freedom for variance calculation
111#[allow(dead_code)]
112pub fn normalize_inplace<F>(data: &mut ArrayViewMut1<F>, ddof: usize) -> StatsResult<()>
113where
114    F: Float + NumCast + std::fmt::Display,
115{
116    let (mean, variance) = welford_variance(&data.to_owned(), ddof)?;
117
118    if variance <= F::epsilon() {
119        return Err(StatsError::InvalidArgument(
120            "Cannot normalize data with zero variance".to_string(),
121        ));
122    }
123
124    let std_dev = variance.sqrt();
125
126    // Normalize in-place
127    for val in data.iter_mut() {
128        *val = (*val - mean) / std_dev;
129    }
130
131    Ok(())
132}
133
134/// Memory-efficient quantile computation using quickselect
135///
136/// This function computes quantiles without fully sorting the array,
137/// which saves memory and time for large datasets.
138///
139/// # Arguments
140///
141/// * `data` - Input data array (will be partially reordered)
142/// * `q` - Quantile to compute (0 to 1)
143///
144/// # Returns
145///
146/// The computed quantile value
147#[allow(dead_code)]
148pub fn quantile_quickselect<F>(data: &mut [F], q: F) -> StatsResult<F>
149where
150    F: Float + NumCast + std::fmt::Display,
151{
152    if data.is_empty() {
153        return Err(StatsError::InvalidArgument(
154            "Cannot compute quantile of empty array".to_string(),
155        ));
156    }
157
158    if q < F::zero() || q > F::one() {
159        return Err(StatsError::domain("Quantile must be between 0 and 1"));
160    }
161
162    let n = data.len();
163    let pos = q * F::from(n - 1).expect("Failed to convert to float");
164    let k = NumCast::from(pos.floor()).expect("Operation failed");
165
166    // Use quickselect to find k-th element
167    quickselect(data, k);
168
169    let lower = data[k];
170
171    // Handle interpolation if needed
172    let frac = pos - pos.floor();
173    if frac > F::zero() && k + 1 < n {
174        quickselect(&mut data[(k + 1)..], 0);
175        let upper = data[k + 1];
176        Ok(lower + frac * (upper - lower))
177    } else {
178        Ok(lower)
179    }
180}
181
182/// Quickselect algorithm for finding k-th smallest element
183#[allow(dead_code)]
184fn quickselect<F: Float>(data: &mut [F], k: usize) {
185    let len = data.len();
186    if len <= 1 {
187        return;
188    }
189
190    let mut left = 0;
191    let mut right = len - 1;
192
193    while left < right {
194        let pivot_idx = partition(data, left, right);
195
196        match k.cmp(&pivot_idx) {
197            Ordering::Less => right = pivot_idx - 1,
198            Ordering::Greater => left = pivot_idx + 1,
199            Ordering::Equal => return,
200        }
201    }
202}
203
204/// Partition function for quickselect
205#[allow(dead_code)]
206fn partition<F: Float>(data: &mut [F], left: usize, right: usize) -> usize {
207    let pivot_idx = left + (right - left) / 2;
208    let pivot = data[pivot_idx];
209
210    data.swap(pivot_idx, right);
211
212    let mut store_idx = left;
213    for i in left..right {
214        if data[i] < pivot {
215            data.swap(i, store_idx);
216            store_idx += 1;
217        }
218    }
219
220    data.swap(store_idx, right);
221    store_idx
222}
223
224/// Memory-efficient covariance matrix computation
225///
226/// Computes covariance matrix using a streaming algorithm that processes
227/// data in chunks to minimize memory usage.
228///
229/// # Arguments
230///
231/// * `data` - 2D array where columns are variables
232/// * `ddof` - Delta degrees of freedom
233///
234/// # Returns
235///
236/// Covariance matrix
237#[allow(dead_code)]
238pub fn covariance_chunked<F, D>(
239    data: &ArrayBase<D, Ix2>,
240    ddof: usize,
241) -> StatsResult<scirs2_core::ndarray::Array2<F>>
242where
243    F: Float + NumCast,
244    D: Data<Elem = F>,
245{
246    let n_obs = data.nrows();
247    let n_vars = data.ncols();
248
249    if n_obs <= ddof {
250        return Err(StatsError::InvalidArgument(
251            "Not enough observations for the given degrees of freedom".to_string(),
252        ));
253    }
254
255    // Compute means for each variable
256    let mut means = scirs2_core::ndarray::Array1::zeros(n_vars);
257    for j in 0..n_vars {
258        let col = data.slice(s![.., j]);
259        means[j] = col.iter().fold(F::zero(), |acc, &val| acc + val)
260            / F::from(n_obs).expect("Failed to convert to float");
261    }
262
263    // Initialize covariance matrix
264    let mut cov_matrix = scirs2_core::ndarray::Array2::zeros((n_vars, n_vars));
265
266    // Process data in chunks to compute covariance
267    let chunksize = CHUNK_SIZE / n_vars;
268    for chunk_start in (0..n_obs).step_by(chunksize) {
269        let chunk_end = (chunk_start + chunksize).min(n_obs);
270        let chunk = data.slice(s![chunk_start..chunk_end, ..]);
271
272        // Update covariance for this chunk
273        for i in 0..n_vars {
274            for j in i..n_vars {
275                let mut sum = F::zero();
276                for k in 0..chunk.nrows() {
277                    let xi = chunk[(k, i)] - means[i];
278                    let xj = chunk[(k, j)] - means[j];
279                    sum = sum + xi * xj;
280                }
281                cov_matrix[(i, j)] = cov_matrix[(i, j)] + sum;
282            }
283        }
284    }
285
286    // Normalize and fill symmetric entries
287    let factor = F::from(n_obs - ddof).expect("Failed to convert to float");
288    for i in 0..n_vars {
289        for j in i..n_vars {
290            cov_matrix[(i, j)] = cov_matrix[(i, j)] / factor;
291            if i != j {
292                cov_matrix[(j, i)] = cov_matrix[(i, j)];
293            }
294        }
295    }
296
297    Ok(cov_matrix)
298}
299
300/// Streaming correlation computation for large datasets
301///
302/// Computes correlation between two variables without loading all data at once.
303#[allow(dead_code)]
304pub struct StreamingCorrelation<F: Float> {
305    n: usize,
306    sum_x: F,
307    sum_y: F,
308    sum_xx: F,
309    sum_yy: F,
310    sum_xy: F,
311}
312
313#[allow(dead_code)]
314impl<F: Float + NumCast + std::fmt::Display> StreamingCorrelation<F> {
315    /// Create a new streaming correlation calculator
316    pub fn new() -> Self {
317        Self {
318            n: 0,
319            sum_x: F::zero(),
320            sum_y: F::zero(),
321            sum_xx: F::zero(),
322            sum_yy: F::zero(),
323            sum_xy: F::zero(),
324        }
325    }
326
327    /// Update with a new pair of values
328    pub fn update(&mut self, x: F, y: F) {
329        self.n += 1;
330        self.sum_x = self.sum_x + x;
331        self.sum_y = self.sum_y + y;
332        self.sum_xx = self.sum_xx + x * x;
333        self.sum_yy = self.sum_yy + y * y;
334        self.sum_xy = self.sum_xy + x * y;
335    }
336
337    /// Update with arrays of values
338    pub fn update_batch<D>(&mut self, x: &ArrayBase<D, Ix1>, y: &ArrayBase<D, Ix1>)
339    where
340        D: Data<Elem = F>,
341    {
342        for (&xi, &yi) in x.iter().zip(y.iter()) {
343            self.update(xi, yi);
344        }
345    }
346
347    /// Compute the correlation coefficient
348    pub fn correlation(&self) -> StatsResult<F> {
349        if self.n < 2 {
350            return Err(StatsError::InvalidArgument(
351                "Need at least 2 observations to compute correlation".to_string(),
352            ));
353        }
354
355        let n = F::from(self.n).expect("Failed to convert to float");
356        let mean_x = self.sum_x / n;
357        let mean_y = self.sum_y / n;
358
359        let cov_xy = (self.sum_xy - n * mean_x * mean_y) / (n - F::one());
360        let var_x = (self.sum_xx - n * mean_x * mean_x) / (n - F::one());
361        let var_y = (self.sum_yy - n * mean_y * mean_y) / (n - F::one());
362
363        if var_x <= F::epsilon() || var_y <= F::epsilon() {
364            return Err(StatsError::InvalidArgument(
365                "Cannot compute correlation when one or both variables have zero variance"
366                    .to_string(),
367            ));
368        }
369
370        Ok(cov_xy / (var_x * var_y).sqrt())
371    }
372
373    /// Merge with another streaming correlation
374    pub fn merge(&mut self, other: &Self) {
375        self.n += other.n;
376        self.sum_x = self.sum_x + other.sum_x;
377        self.sum_y = self.sum_y + other.sum_y;
378        self.sum_xx = self.sum_xx + other.sum_xx;
379        self.sum_yy = self.sum_yy + other.sum_yy;
380        self.sum_xy = self.sum_xy + other.sum_xy;
381    }
382}
383
384/// Incremental covariance matrix computation
385///
386/// Updates covariance matrix incrementally as new observations arrive.
387#[allow(dead_code)]
388pub struct IncrementalCovariance<F: Float> {
389    n: usize,
390    means: scirs2_core::ndarray::Array1<F>,
391    cov_matrix: scirs2_core::ndarray::Array2<F>,
392    n_vars: usize,
393}
394
395#[allow(dead_code)]
396impl<F: Float + NumCast + scirs2_core::ndarray::ScalarOperand + std::fmt::Display>
397    IncrementalCovariance<F>
398{
399    /// Create a new incremental covariance calculator
400    pub fn new(_nvars: usize) -> Self {
401        Self {
402            n: 0,
403            means: scirs2_core::ndarray::Array1::zeros(_nvars),
404            cov_matrix: scirs2_core::ndarray::Array2::zeros((_nvars, _nvars)),
405            n_vars: _nvars,
406        }
407    }
408
409    /// Update with a new observation
410    pub fn update(&mut self, observation: &scirs2_core::ndarray::ArrayView1<F>) -> StatsResult<()> {
411        if observation.len() != self.n_vars {
412            return Err(StatsError::DimensionMismatch(
413                "Observation dimension doesn't match".to_string(),
414            ));
415        }
416
417        self.n += 1;
418        let n = F::from(self.n).expect("Failed to convert to float");
419
420        // Update means and covariance using Welford's algorithm
421        let mut delta = scirs2_core::ndarray::Array1::zeros(self.n_vars);
422
423        for i in 0..self.n_vars {
424            delta[i] = observation[i] - self.means[i];
425            self.means[i] = self.means[i] + delta[i] / n;
426        }
427
428        if self.n > 1 {
429            for i in 0..self.n_vars {
430                for j in i..self.n_vars {
431                    let delta_new = observation[j] - self.means[j];
432                    let cov_update = delta[i] * delta_new * (n - F::one()) / n;
433                    self.cov_matrix[(i, j)] = self.cov_matrix[(i, j)] + cov_update;
434                    if i != j {
435                        self.cov_matrix[(j, i)] = self.cov_matrix[(i, j)];
436                    }
437                }
438            }
439        }
440
441        Ok(())
442    }
443
444    /// Get current covariance matrix
445    pub fn covariance(&self, ddof: usize) -> StatsResult<scirs2_core::ndarray::Array2<F>> {
446        if self.n <= ddof {
447            return Err(StatsError::InvalidArgument(
448                "Not enough observations for the given degrees of freedom".to_string(),
449            ));
450        }
451
452        let factor = F::from(self.n - ddof).expect("Failed to convert to float");
453        Ok(&self.cov_matrix / factor)
454    }
455
456    /// Get current means
457    pub fn means(&self) -> &scirs2_core::ndarray::Array1<F> {
458        &self.means
459    }
460}
461
462/// Memory-efficient rolling window statistics
463///
464/// Computes statistics over a sliding window without storing all data.
465#[allow(dead_code)]
466pub struct RollingStats<F: Float> {
467    windowsize: usize,
468    buffer: Vec<F>,
469    position: usize,
470    is_full: bool,
471    sum: F,
472    sum_squares: F,
473}
474
475#[allow(dead_code)]
476impl<F: Float + NumCast + std::fmt::Display> RollingStats<F> {
477    /// Create a new rolling statistics calculator
478    pub fn new(_windowsize: usize) -> StatsResult<Self> {
479        if _windowsize == 0 {
480            return Err(StatsError::InvalidArgument(
481                "Window size must be positive".to_string(),
482            ));
483        }
484
485        Ok(Self {
486            windowsize: _windowsize,
487            buffer: vec![F::zero(); _windowsize],
488            position: 0,
489            is_full: false,
490            sum: F::zero(),
491            sum_squares: F::zero(),
492        })
493    }
494
495    /// Add a new value to the rolling window
496    pub fn push(&mut self, value: F) {
497        let old_value = self.buffer[self.position];
498
499        // Update running sums
500        self.sum = self.sum - old_value + value;
501        self.sum_squares = self.sum_squares - old_value * old_value + value * value;
502
503        // Store new value
504        self.buffer[self.position] = value;
505        self.position = (self.position + 1) % self.windowsize;
506
507        if !self.is_full && self.position == 0 {
508            self.is_full = true;
509        }
510    }
511
512    /// Get current window size
513    pub fn len(&self) -> usize {
514        if self.is_full {
515            self.windowsize
516        } else {
517            self.position
518        }
519    }
520
521    /// Compute mean of current window
522    pub fn mean(&self) -> F {
523        let n = self.len();
524        if n == 0 {
525            F::zero()
526        } else {
527            self.sum / F::from(n).expect("Failed to convert to float")
528        }
529    }
530
531    /// Compute variance of current window
532    pub fn variance(&self, ddof: usize) -> StatsResult<F> {
533        let n = self.len();
534        if n <= ddof {
535            return Err(StatsError::InvalidArgument(
536                "Not enough data for the given degrees of freedom".to_string(),
537            ));
538        }
539
540        let mean = self.mean();
541        let n_f = F::from(n).expect("Failed to convert to float");
542        let variance = (self.sum_squares / n_f) - mean * mean;
543        Ok(variance * n_f / F::from(n - ddof).expect("Failed to convert to float"))
544    }
545
546    /// Get current buffer as array
547    pub fn as_array(&self) -> scirs2_core::ndarray::Array1<F> {
548        if self.is_full {
549            scirs2_core::ndarray::Array1::from_vec(self.buffer.clone())
550        } else {
551            scirs2_core::ndarray::Array1::from_vec(self.buffer[..self.position].to_vec())
552        }
553    }
554}
555
556/// Memory-efficient histogram computation
557///
558/// Computes histogram without storing all data, using a streaming approach.
559pub struct StreamingHistogram<F: Float> {
560    bins: Vec<F>,
561    counts: Vec<usize>,
562    min_val: F,
563    max_val: F,
564    total_count: usize,
565}
566
567impl<F: Float + NumCast + std::fmt::Display> StreamingHistogram<F> {
568    /// Create a new streaming histogram
569    pub fn new(_n_bins: usize, min_val: F, maxval: F) -> Self {
570        let bin_width = (maxval - min_val) / F::from(_n_bins).expect("Failed to convert to float");
571        let bins: Vec<F> = (0..=_n_bins)
572            .map(|i| min_val + F::from(i).expect("Failed to convert to float") * bin_width)
573            .collect();
574
575        Self {
576            bins,
577            counts: vec![0; _n_bins],
578            min_val,
579            max_val: maxval,
580            total_count: 0,
581        }
582    }
583
584    /// Add a value to the histogram
585    pub fn add_value(&mut self, value: F) {
586        if value >= self.min_val && value <= self.max_val {
587            let n_bins = self.counts.len();
588            let bin_width = (self.max_val - self.min_val)
589                / F::from(n_bins).expect("Failed to convert to float");
590            let bin_idx = ((value - self.min_val) / bin_width).floor();
591            let bin_idx: usize = NumCast::from(bin_idx).unwrap_or(n_bins - 1).min(n_bins - 1);
592            self.counts[bin_idx] += 1;
593            self.total_count += 1;
594        }
595    }
596
597    /// Add multiple values
598    pub fn add_values<D>(&mut self, values: &ArrayBase<D, Ix1>)
599    where
600        D: Data<Elem = F>,
601    {
602        for &value in values.iter() {
603            self.add_value(value);
604        }
605    }
606
607    /// Get the histogram results
608    pub fn get_histogram(&self) -> (Vec<F>, Vec<usize>) {
609        (self.bins.clone(), self.counts.clone())
610    }
611
612    /// Get normalized histogram (density)
613    pub fn get_density(&self) -> (Vec<F>, Vec<F>) {
614        let n_bins = self.counts.len();
615        let bin_width =
616            (self.max_val - self.min_val) / F::from(n_bins).expect("Failed to convert to float");
617        let total = F::from(self.total_count).expect("Failed to convert to float") * bin_width;
618
619        let density: Vec<F> = self
620            .counts
621            .iter()
622            .map(|&count| F::from(count).expect("Failed to convert to float") / total)
623            .collect();
624
625        (self.bins.clone(), density)
626    }
627}
628
629/// Out-of-core statistics for datasets larger than memory
630///
631/// Processes data from files in chunks without loading entire dataset.
632#[allow(dead_code)]
633pub struct OutOfCoreStats<F: Float> {
634    chunksize: usize,
635    _phantom: std::marker::PhantomData<F>,
636}
637
638#[allow(dead_code)]
639impl<F: Float + NumCast + std::str::FromStr + std::fmt::Display> OutOfCoreStats<F> {
640    /// Create a new out-of-core statistics processor
641    pub fn new(_chunksize: usize) -> Self {
642        Self {
643            chunksize: _chunksize,
644            _phantom: std::marker::PhantomData,
645        }
646    }
647
648    /// Compute mean from a CSV file
649    pub fn mean_from_csv<P: AsRef<Path>>(
650        &self,
651        path: P,
652        column: usize,
653        has_header: bool,
654    ) -> StatsResult<F> {
655        let file = File::open(path)
656            .map_err(|e| StatsError::InvalidArgument(format!("Failed to open file: {e}")))?;
657        let mut reader = BufReader::new(file);
658
659        let mut sum = F::zero();
660        let mut count = 0;
661        let mut line = String::new();
662        let mut line_num = 0;
663
664        // Skip _header if present
665        if has_header {
666            reader
667                .read_line(&mut line)
668                .map_err(|e| StatsError::InvalidArgument(format!("Failed to read header: {e}")))?;
669            line.clear();
670        }
671
672        // Process file in lines
673        loop {
674            match reader.read_line(&mut line) {
675                Ok(0) => break, // EOF
676                Ok(_) => {
677                    line_num += 1;
678                    let fields: Vec<&str> = line.trim().split(',').collect();
679
680                    if fields.len() > column {
681                        if let Ok(value) = fields[column].parse::<F>() {
682                            sum = sum + value;
683                            count += 1;
684                        }
685                    }
686
687                    line.clear();
688                }
689                Err(e) => {
690                    return Err(StatsError::ComputationError(format!(
691                        "Error reading line {line_num}: {e}"
692                    )))
693                }
694            }
695        }
696
697        if count == 0 {
698            return Err(StatsError::InvalidArgument(
699                "No valid data found".to_string(),
700            ));
701        }
702
703        Ok(sum / F::from(count).expect("Failed to convert to float"))
704    }
705
706    /// Compute variance from a CSV file using two-pass algorithm
707    pub fn variance_from_csv<P: AsRef<Path>>(
708        &self,
709        path: P,
710        column: usize,
711        has_header: bool,
712        ddof: usize,
713    ) -> StatsResult<(F, F)> {
714        // First pass: compute mean
715        let mean = self.mean_from_csv(&path, column, has_header)?;
716
717        // Second pass: compute variance
718        let file = File::open(path)
719            .map_err(|e| StatsError::InvalidArgument(format!("Failed to open file: {e}")))?;
720        let mut reader = BufReader::new(file);
721
722        let mut sum_sq = F::zero();
723        let mut count = 0;
724        let mut line = String::new();
725
726        // Skip _header if present
727        if has_header {
728            reader
729                .read_line(&mut line)
730                .map_err(|e| StatsError::InvalidArgument(format!("Failed to read header: {e}")))?;
731            line.clear();
732        }
733
734        // Process file
735        loop {
736            match reader.read_line(&mut line) {
737                Ok(0) => break, // EOF
738                Ok(_) => {
739                    let fields: Vec<&str> = line.trim().split(',').collect();
740
741                    if fields.len() > column {
742                        if let Ok(value) = fields[column].parse::<F>() {
743                            let diff = value - mean;
744                            sum_sq = sum_sq + diff * diff;
745                            count += 1;
746                        }
747                    }
748
749                    line.clear();
750                }
751                Err(e) => {
752                    return Err(StatsError::ComputationError(format!(
753                        "Error reading file: {}",
754                        e
755                    )))
756                }
757            }
758        }
759
760        if count <= ddof {
761            return Err(StatsError::InvalidArgument(
762                "Not enough data for the given degrees of freedom".to_string(),
763            ));
764        }
765
766        let variance = sum_sq
767            / F::from(count - ddof).ok_or_else(|| {
768                StatsError::ComputationError(
769                    "Failed to convert count - ddof to target type".to_string(),
770                )
771            })?;
772        Ok((mean, variance))
773    }
774
775    /// Process large binary file of floats
776    pub fn process_binary_file<P: AsRef<Path>, G>(
777        &self,
778        path: P,
779        mut processor: G,
780    ) -> StatsResult<()>
781    where
782        G: FnMut(&[F]) -> StatsResult<()>,
783    {
784        use std::mem;
785
786        let file = File::open(path)
787            .map_err(|e| StatsError::InvalidArgument(format!("Failed to open file: {e}")))?;
788        let mut reader = BufReader::new(file);
789
790        let elementsize = mem::size_of::<F>();
791        let buffersize = self.chunksize * elementsize;
792        let mut buffer = vec![0u8; buffersize];
793
794        loop {
795            match reader.read(&mut buffer) {
796                Ok(0) => break, // EOF
797                Ok(bytes_read) => {
798                    let n_elements = bytes_read / elementsize;
799
800                    // Convert bytes to floats (unsafe but efficient)
801                    let floats = unsafe {
802                        std::slice::from_raw_parts(buffer.as_ptr() as *const F, n_elements)
803                    };
804
805                    processor(floats)?;
806                }
807                Err(e) => {
808                    return Err(StatsError::ComputationError(format!(
809                        "Error reading file: {}",
810                        e
811                    )))
812                }
813            }
814        }
815
816        Ok(())
817    }
818}
819
820/// Memory-mapped statistics for very large files
821///
822/// Uses memory-mapped I/O for efficient access to large datasets.
823#[cfg(feature = "memmap")]
824pub struct MemoryMappedStats<F: Float> {
825    _phantom: std::marker::PhantomData<F>,
826}
827
828#[cfg(feature = "memmap")]
829impl<F: Float + NumCast + std::fmt::Display> MemoryMappedStats<F> {
830    /// Create a new memory-mapped statistics processor
831    pub fn new() -> Self {
832        Self {
833            _phantom: std::marker::PhantomData,
834        }
835    }
836
837    /// Process a memory-mapped file
838    pub fn process_mmap_file<P: AsRef<Path>, G>(&self, path: P, processor: G) -> StatsResult<()>
839    where
840        G: FnOnce(&[F]) -> StatsResult<()>,
841    {
842        let file = File::open(path)
843            .map_err(|e| StatsError::InvalidArgument(format!("Failed to open file: {e}")))?;
844
845        let mmap = unsafe {
846            Mmap::map(&file)
847                .map_err(|e| StatsError::ComputationError(format!("Failed to mmap file: {}", e)))?
848        };
849
850        // Interpret memory-mapped data as array of floats
851        let data = unsafe {
852            std::slice::from_raw_parts(
853                mmap.as_ptr() as *const F,
854                mmap.len() / std::mem::size_of::<F>(),
855            )
856        };
857
858        processor(data)
859    }
860}