1use crate::error::{StatsError, StatsResult};
10use scirs2_core::ndarray::{ArrayBase, Data, Ix1};
11use scirs2_core::numeric::{Float, NumCast};
12use std::cell::RefCell;
13use std::collections::VecDeque;
14use std::rc::Rc;
15
16#[cfg(feature = "memmap")]
17use memmap2::Mmap;
18
19#[derive(Debug, Clone)]
21pub struct MemoryConfig {
22 pub max_memory: usize,
24 pub chunksize: usize,
26 pub use_pooling: bool,
28 pub cache_linesize: usize,
30}
31
32impl Default for MemoryConfig {
33 fn default() -> Self {
34 Self {
35 max_memory: 1 << 30, chunksize: 8192, use_pooling: true,
38 cache_linesize: 64,
39 }
40 }
41}
42
43pub struct MemoryPool<F> {
45 pools: RefCell<Vec<VecDeque<Vec<F>>>>,
46 config: MemoryConfig,
47}
48
49impl<F: Float> MemoryPool<F> {
50 pub fn new(config: MemoryConfig) -> Self {
51 let pools = vec![VecDeque::new(); 20]; Self {
54 pools: RefCell::new(pools),
55 config,
56 }
57 }
58
59 pub fn acquire(&self, size: usize) -> Vec<F> {
61 if !self.config.use_pooling {
62 return vec![F::zero(); size];
63 }
64
65 let pool_idx = (size.next_power_of_two().trailing_zeros() as usize).min(19);
66 let mut pools = self.pools.borrow_mut();
67
68 if let Some(mut buffer) = pools[pool_idx].pop_front() {
69 buffer.resize(size, F::zero());
70 buffer
71 } else {
72 vec![F::zero(); size]
73 }
74 }
75
76 pub fn release(&self, buffer: Vec<F>) {
78 if !self.config.use_pooling || buffer.is_empty() {
79 return;
80 }
81
82 let capacity = buffer.capacity();
83 let pool_idx = (capacity.next_power_of_two().trailing_zeros() as usize).min(19);
84 let mut pools = self.pools.borrow_mut();
85
86 if pools[pool_idx].len() < 10 {
88 pools[pool_idx].push_back(buffer);
89 }
90 }
91}
92
93#[allow(dead_code)]
97pub fn mean_zero_copy<F, D>(x: &ArrayBase<D, Ix1>) -> StatsResult<F>
98where
99 F: Float + NumCast,
100 D: Data<Elem = F>,
101{
102 if x.is_empty() {
103 return Err(StatsError::invalid_argument(
104 "Cannot compute mean of empty array",
105 ));
106 }
107
108 let mut sum = F::zero();
110 let mut c = F::zero(); for &val in x.iter() {
113 let y = val - c;
114 let t = sum + y;
115 c = (t - sum) - y;
116 sum = t;
117 }
118
119 Ok(sum / F::from(x.len()).unwrap())
120}
121
122#[allow(dead_code)]
126pub fn variance_cache_aware<F, D>(
127 x: &ArrayBase<D, Ix1>,
128 ddof: usize,
129 config: Option<MemoryConfig>,
130) -> StatsResult<F>
131where
132 F: Float + NumCast,
133 D: Data<Elem = F>,
134{
135 let n = x.len();
136 if n <= ddof {
137 return Err(StatsError::invalid_argument(
138 "Not enough data points for the given degrees of freedom",
139 ));
140 }
141
142 let config = config.unwrap_or_default();
143 let cache_elements = config.cache_linesize / std::mem::size_of::<F>();
144
145 let mean = mean_zero_copy(x)?;
147
148 let mut sum_sq_dev = F::zero();
150 let mut c = F::zero(); let chunksize = cache_elements.min(n); let mut processed = 0;
156 for chunk in x.exact_chunks(chunksize) {
157 for &val in chunk.iter() {
158 let dev = val - mean;
159 let sq_dev = dev * dev;
160
161 let y = sq_dev - c;
163 let t = sum_sq_dev + y;
164 c = (t - sum_sq_dev) - y;
165 sum_sq_dev = t;
166 processed += 1;
167 }
168 }
169
170 for i in processed..n {
172 let val = x[i];
173 let dev = val - mean;
174 let sq_dev = dev * dev;
175
176 let y = sq_dev - c;
178 let t = sum_sq_dev + y;
179 c = (t - sum_sq_dev) - y;
180 sum_sq_dev = t;
181 }
182
183 Ok(sum_sq_dev / F::from(n - ddof).unwrap())
184}
185
186pub struct LazyStats<'a, F, D>
188where
189 F: Float,
190 D: Data<Elem = F>,
191{
192 data: &'a ArrayBase<D, Ix1>,
193 mean: RefCell<Option<F>>,
194 variance: RefCell<Option<F>>,
195 min: RefCell<Option<F>>,
196 max: RefCell<Option<F>>,
197 sorted: RefCell<Option<Vec<F>>>,
198}
199
200impl<'a, F, D> LazyStats<'a, F, D>
201where
202 F: Float + NumCast,
203 D: Data<Elem = F>,
204{
205 pub fn new(data: &'a ArrayBase<D, Ix1>) -> Self {
206 Self {
207 data,
208 mean: RefCell::new(None),
209 variance: RefCell::new(None),
210 min: RefCell::new(None),
211 max: RefCell::new(None),
212 sorted: RefCell::new(None),
213 }
214 }
215
216 pub fn mean(&self) -> StatsResult<F> {
218 if let Some(mean) = *self.mean.borrow() {
219 return Ok(mean);
220 }
221
222 let mean = mean_zero_copy(self.data)?;
223 *self.mean.borrow_mut() = Some(mean);
224 Ok(mean)
225 }
226
227 pub fn variance(&self, ddof: usize) -> StatsResult<F> {
229 if let Some(var) = *self.variance.borrow() {
230 return Ok(var);
231 }
232
233 let var = variance_cache_aware(self.data, ddof, None)?;
234 *self.variance.borrow_mut() = Some(var);
235 Ok(var)
236 }
237
238 pub fn minmax(&self) -> StatsResult<(F, F)> {
240 if let (Some(min), Some(max)) = (*self.min.borrow(), *self.max.borrow()) {
241 return Ok((min, max));
242 }
243
244 if self.data.is_empty() {
245 return Err(StatsError::invalid_argument("Empty array"));
246 }
247
248 let (min, max) = self
249 .data
250 .iter()
251 .fold((self.data[0], self.data[0]), |(min, max), &val| {
252 (
253 if val < min { val } else { min },
254 if val > max { val } else { max },
255 )
256 });
257
258 *self.min.borrow_mut() = Some(min);
259 *self.max.borrow_mut() = Some(max);
260 Ok((min, max))
261 }
262
263 pub fn median(&self) -> StatsResult<F> {
265 self.quantile(F::from(0.5).unwrap())
266 }
267
268 pub fn quantile(&self, q: F) -> StatsResult<F> {
270 if self.data.is_empty() {
271 return Err(StatsError::invalid_argument("Empty array"));
272 }
273
274 let mut sorted_ref = self.sorted.borrow_mut();
275 if sorted_ref.is_none() {
276 let mut sorted: Vec<F> = self.data.iter().cloned().collect();
277 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
278 *sorted_ref = Some(sorted);
279 }
280
281 let sorted = sorted_ref.as_ref().unwrap();
282 let n = sorted.len();
283 let pos = q * F::from(n - 1).unwrap();
284 let idx = pos.floor();
285 let frac = pos - idx;
286
287 let idx = idx.to_usize().unwrap_or(0).min(n - 1);
288
289 if idx == n - 1 {
290 Ok(sorted[idx])
291 } else {
292 Ok(sorted[idx] * (F::one() - frac) + sorted[idx + 1] * frac)
293 }
294 }
295}
296
297pub struct StreamingCovariance<F> {
301 n: usize,
302 means: Vec<F>,
303 cov: Vec<Vec<F>>,
304 pool: Rc<MemoryPool<F>>,
305}
306
307impl<F: Float + NumCast + std::fmt::Display> StreamingCovariance<F> {
308 pub fn new(nfeatures: usize, pool: Rc<MemoryPool<F>>) -> Self {
309 Self {
310 n: 0,
311 means: vec![F::zero(); nfeatures],
312 cov: vec![vec![F::zero(); nfeatures]; nfeatures],
313 pool,
314 }
315 }
316
317 pub fn update(&mut self, sample: &[F]) {
319 assert_eq!(sample.len(), self.means.len(), "Feature dimension mismatch");
320
321 self.n += 1;
322 let n_f = F::from(self.n).unwrap();
323
324 let mut deltas = self.pool.acquire(sample.len());
326
327 for i in 0..sample.len() {
328 deltas[i] = sample[i] - self.means[i];
329 self.means[i] = self.means[i] + deltas[i] / n_f;
330 }
331
332 for i in 0..sample.len() {
334 for j in i..sample.len() {
335 let delta_prod = deltas[i] * (sample[j] - self.means[j]);
336 self.cov[i][j] = self.cov[i][j] + delta_prod;
337 if i != j {
338 self.cov[j][i] = self.cov[i][j]; }
340 }
341 }
342
343 self.pool.release(deltas);
344 }
345
346 pub fn covariance(&self, ddof: usize) -> StatsResult<Vec<Vec<F>>> {
348 if self.n <= ddof {
349 return Err(StatsError::invalid_argument(
350 "Not enough samples for given degrees of freedom",
351 ));
352 }
353
354 let factor = F::from(self.n - ddof).unwrap();
355 let mut result = self.cov.clone();
356
357 for i in 0..result.len() {
358 for j in 0..result[i].len() {
359 result[i][j] = result[i][j] / factor;
360 }
361 }
362
363 Ok(result)
364 }
365}
366
367#[cfg(feature = "memmap")]
369pub struct MemoryMappedStats {
370 mmap: Mmap,
371 elementsize: usize,
372 n_elements: usize,
373}
374
375#[cfg(feature = "memmap")]
376impl MemoryMappedStats {
377 pub fn new(path: &std::path::Path) -> StatsResult<Self> {
378 use std::fs::OpenOptions;
379
380 let file = OpenOptions::new()
381 .read(true)
382 .open(path)
383 .map_err(|e| StatsError::computation(format!("Failed to open file: {}", e)))?;
384
385 let metadata = file
386 .metadata()
387 .map_err(|e| StatsError::computation(format!("Failed to get metadata: {}", e)))?;
388
389 let filesize = metadata.len() as usize;
390 let elementsize = std::mem::size_of::<f64>(); let n_elements = filesize / elementsize;
392
393 unsafe {
394 let mmap = Mmap::map(&file)
395 .map_err(|e| StatsError::computation(format!("Failed to mmap: {}", e)))?;
396
397 Ok(Self {
398 mmap,
399 elementsize,
400 n_elements,
401 })
402 }
403 }
404
405 pub fn mean(&self) -> StatsResult<f64> {
407 let data = unsafe {
408 std::slice::from_raw_parts(self.mmap.as_ptr() as *const f64, self.n_elements)
409 };
410
411 let chunksize = 8192;
413 let mut sum = 0.0;
414
415 for chunk in data.chunks(chunksize) {
416 sum += chunk.iter().sum::<f64>();
417 }
418
419 Ok(sum / self.n_elements as f64)
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use scirs2_core::ndarray::array;
427
428 #[test]
429 fn test_memory_pool() {
430 let pool = MemoryPool::<f64>::new(MemoryConfig::default());
431
432 let buf1 = pool.acquire(100);
433 assert_eq!(buf1.len(), 100);
434
435 pool.release(buf1);
436
437 let buf2 = pool.acquire(100);
438 assert_eq!(buf2.len(), 100);
439 }
440
441 #[test]
442 fn test_lazy_stats() {
443 let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
444 let stats = LazyStats::new(&data);
445
446 assert!((stats.mean().unwrap() - 3.0).abs() < 1e-10);
447 assert!((stats.variance(1).unwrap() - 2.5).abs() < 1e-10);
448
449 let (min, max) = stats.minmax().unwrap();
450 assert_eq!(min, 1.0);
451 assert_eq!(max, 5.0);
452
453 assert!((stats.median().unwrap() - 3.0).abs() < 1e-10);
454 }
455
456 #[test]
457 fn test_streaming_covariance() {
458 let pool = Rc::new(MemoryPool::new(MemoryConfig::default()));
459 let mut cov = StreamingCovariance::new(2, pool);
460
461 cov.update(&[1.0, 2.0]);
463 cov.update(&[2.0, 4.0]);
464 cov.update(&[3.0, 6.0]);
465
466 let result = cov.covariance(1).unwrap();
467
468 assert!(result[0][0] > 0.0); assert!(result[1][1] > 0.0); }
472}