use crate::error::{StatsError, StatsResult};
use scirs2_core::ndarray::{s, Array1, Array2, ArrayBase, ArrayView1, ArrayView2, Data, Ix1, Ix2};
use scirs2_core::numeric::{Float, NumCast};
use std::collections::VecDeque;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct MemoryProfile {
pub peak_memory: usize,
pub avg_memory: usize,
pub allocations: usize,
pub deallocations: usize,
pub efficiency_score: f64,
}
pub struct MemoryAdaptiveAlgorithm {
available_memory: usize,
preferred_chunksize: usize,
#[allow(dead_code)]
prefer_inplace: bool,
}
impl Default for MemoryAdaptiveAlgorithm {
fn default() -> Self {
Self::new()
}
}
impl MemoryAdaptiveAlgorithm {
pub fn new() -> Self {
let available_memory = Self::estimate_available_memory();
let preferred_chunksize = Self::calculate_optimal_chunksize(available_memory);
Self {
available_memory,
preferred_chunksize,
prefer_inplace: available_memory < 1_000_000_000, }
}
fn estimate_available_memory() -> usize {
#[cfg(target_os = "linux")]
{
Self::get_available_memory_linux()
}
#[cfg(target_os = "windows")]
{
Self::get_available_memory_windows()
}
#[cfg(target_os = "macos")]
{
Self::get_available_memory_macos()
}
#[cfg(not(any(target_os = "linux", target_os = "windows", target_os = "macos")))]
{
Self::get_available_memory_fallback()
}
}
#[cfg(target_os = "linux")]
fn get_available_memory_linux() -> usize {
use std::fs;
if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
let mut mem_available = None;
let mut mem_free = None;
let mut mem_total = None;
for line in meminfo.lines() {
if line.starts_with("MemAvailable:") {
if let Some(value) = line.split_whitespace().nth(1) {
if let Ok(kb) = value.parse::<usize>() {
mem_available = Some(kb * 1024); }
}
} else if line.starts_with("MemFree:") {
if let Some(value) = line.split_whitespace().nth(1) {
if let Ok(kb) = value.parse::<usize>() {
mem_free = Some(kb * 1024);
}
}
} else if line.starts_with("MemTotal:") {
if let Some(value) = line.split_whitespace().nth(1) {
if let Ok(kb) = value.parse::<usize>() {
mem_total = Some(kb * 1024);
}
}
}
}
if let Some(available) = mem_available {
return available;
} else if let Some(free) = mem_free {
return free;
} else if let Some(total) = mem_total {
return total / 2;
}
}
Self::get_available_memory_fallback()
}
#[cfg(target_os = "windows")]
fn get_available_memory_windows() -> usize {
let conservative_total = 4_000_000_000; conservative_total / 4
}
#[cfg(target_os = "macos")]
fn get_available_memory_macos() -> usize {
use std::process::Command;
if let Ok(output) = Command::new("vm_stat").output() {
if let Ok(stdout) = String::from_utf8(output.stdout) {
let mut pagesize = 4096; let mut free_pages = 0;
let mut inactive_pages = 0;
for line in stdout.lines() {
if line.starts_with("Mach Virtual Memory Statistics:") {
if line.contains("page size of") {
if let Some(size_str) = line.split("page size of ").nth(1) {
if let Some(size_str) = size_str.split(" bytes").next() {
if let Ok(size) = size_str.parse::<usize>() {
pagesize = size;
}
}
}
}
} else if line.starts_with("Pages free:") {
if let Some(count_str) = line.split(':').nth(1) {
if let Some(count_str) = count_str.trim().split('.').next() {
if let Ok(count) = count_str.parse::<usize>() {
free_pages = count;
}
}
}
} else if line.starts_with("Pages inactive:") {
if let Some(count_str) = line.split(':').nth(1) {
if let Some(count_str) = count_str.trim().split('.').next() {
if let Ok(count) = count_str.parse::<usize>() {
inactive_pages = count;
}
}
}
}
}
return (free_pages + inactive_pages) * pagesize;
}
}
Self::get_available_memory_fallback()
}
fn get_available_memory_fallback() -> usize {
let conservative_total = 2_000_000_000; conservative_total / 4 }
fn calculate_optimal_chunksize(_availablememory: usize) -> usize {
let l3_cache_estimate = 8_000_000; let max_chunk = _availablememory / 10;
l3_cache_estimate.min(max_chunk).max(4096)
}
pub fn can_allocate(&self, bytes: usize) -> bool {
bytes <= self.available_memory / 2 }
pub fn recommend_algorithm<F: Float>(&self, datasize: usize) -> AlgorithmChoice {
let elementsize = std::mem::size_of::<F>();
let total_bytes = datasize * elementsize;
if total_bytes < 1_000_000 {
AlgorithmChoice::Direct
} else if self.can_allocate(total_bytes) {
AlgorithmChoice::Optimized
} else {
AlgorithmChoice::Streaming(self.preferred_chunksize / elementsize)
}
}
}
#[derive(Debug, Clone)]
pub enum AlgorithmChoice {
Direct,
Optimized,
Streaming(usize),
}
pub mod zero_copy {
use super::*;
pub fn rolling_stats_zerocopy<F, D, S>(
data: &ArrayBase<D, Ix1>,
windowsize: usize,
stat_fn: S,
) -> StatsResult<Array1<F>>
where
F: Float + NumCast,
D: Data<Elem = F>,
S: Fn(ArrayView1<F>) -> StatsResult<F>,
{
let n = data.len();
if windowsize == 0 || windowsize > n {
return Err(StatsError::invalid_argument("Invalid window size"));
}
let output_len = n - windowsize + 1;
let mut results = Array1::zeros(output_len);
for i in 0..output_len {
let window = data.slice(s![i..i + windowsize]);
results[i] = stat_fn(window)?;
}
Ok(results)
}
pub fn pairwise_operation_zerocopy<F, D, Op>(
data: &ArrayBase<D, Ix2>,
operation: Op,
) -> StatsResult<Array2<F>>
where
F: Float + NumCast,
D: Data<Elem = F>,
Op: Fn(ArrayView1<F>, ArrayView1<F>) -> StatsResult<F>,
{
let n = data.nrows();
let mut result = Array2::zeros((n, n));
for i in 0..n {
result[(i, i)] = F::one(); for j in (i + 1)..n {
let row_i = data.row(i);
let row_j = data.row(j);
let value = operation(row_i, row_j)?;
result[(i, j)] = value;
result[(j, i)] = value; }
}
Ok(result)
}
}
pub mod memory_mapped {
use super::*;
pub fn mmap_mean<'a, F: Float + NumCast + std::fmt::Display + std::iter::Sum<F> + 'a>(
data_chunks: impl Iterator<Item = ArrayView1<'a, F>>,
total_count: usize,
) -> StatsResult<F> {
if total_count == 0 {
return Err(StatsError::invalid_argument("Empty dataset"));
}
let mut total_sum = F::zero();
let mut count_processed = 0;
for chunk in data_chunks {
let chunk_sum = chunk.sum();
total_sum = total_sum + chunk_sum;
count_processed += chunk.len();
}
if count_processed != total_count {
return Err(StatsError::invalid_argument("Chunk _count mismatch"));
}
Ok(total_sum / F::from(total_count).expect("Failed to convert to float"))
}
pub fn mmap_variance<'a, F: Float + NumCast + std::fmt::Display + 'a>(
data_chunks: impl Iterator<Item = ArrayView1<'a, F>>,
total_count: usize,
ddof: usize,
) -> StatsResult<(F, F)> {
if total_count <= ddof {
return Err(StatsError::invalid_argument("Insufficient data for ddof"));
}
let mut mean = F::zero();
let mut m2 = F::zero();
let mut _count = 0;
for chunk in data_chunks {
for &value in chunk.iter() {
_count += 1;
let delta = value - mean;
mean = mean + delta / F::from(_count).expect("Failed to convert to float");
let delta2 = value - mean;
m2 = m2 + delta * delta2;
}
}
let variance = m2 / F::from(_count - ddof).expect("Failed to convert to float");
Ok((mean, variance))
}
}
pub struct RingBufferStats<F: Float> {
buffer: VecDeque<F>,
capacity: usize,
sum: F,
sum_squares: F,
}
impl<F: Float + NumCast + std::fmt::Display> RingBufferStats<F> {
pub fn new(capacity: usize) -> Self {
Self {
buffer: VecDeque::with_capacity(capacity),
capacity,
sum: F::zero(),
sum_squares: F::zero(),
}
}
pub fn push(&mut self, value: F) {
if self.buffer.len() >= self.capacity {
if let Some(old_value) = self.buffer.pop_front() {
self.sum = self.sum - old_value;
self.sum_squares = self.sum_squares - old_value * old_value;
}
}
self.buffer.push_back(value);
self.sum = self.sum + value;
self.sum_squares = self.sum_squares + value * value;
}
pub fn mean(&self) -> F {
if self.buffer.is_empty() {
F::zero()
} else {
self.sum / F::from(self.buffer.len()).expect("Operation failed")
}
}
pub fn variance(&self, ddof: usize) -> Option<F> {
let n = self.buffer.len();
if n <= ddof {
return None;
}
let mean = self.mean();
let var = self.sum_squares / F::from(n).expect("Failed to convert to float") - mean * mean;
Some(
var * F::from(n).expect("Failed to convert to float")
/ F::from(n - ddof).expect("Failed to convert to float"),
)
}
pub fn std(&self, ddof: usize) -> Option<F> {
self.variance(ddof).map(|v| v.sqrt())
}
}
pub struct LazyStatComputation<F: Float> {
data_ref: Arc<Vec<F>>,
operations: Vec<StatOperation>,
}
#[derive(Clone)]
enum StatOperation {
Mean,
Variance(usize), Quantile(f64),
#[allow(dead_code)]
StandardScaling,
}
impl<F: Float + NumCast + std::iter::Sum + std::fmt::Display> LazyStatComputation<F> {
pub fn new(data: Vec<F>) -> Self {
Self {
data_ref: Arc::new(data),
operations: Vec::new(),
}
}
pub fn mean(mut self) -> Self {
self.operations.push(StatOperation::Mean);
self
}
pub fn variance(mut self, ddof: usize) -> Self {
self.operations.push(StatOperation::Variance(ddof));
self
}
pub fn quantile(mut self, q: f64) -> Self {
self.operations.push(StatOperation::Quantile(q));
self
}
pub fn compute(&self) -> StatsResult<Vec<F>> {
let mut results = Vec::new();
let data = &*self.data_ref;
let need_mean = self
.operations
.iter()
.any(|op| matches!(op, StatOperation::Mean | StatOperation::Variance(_)));
let need_sorted = self
.operations
.iter()
.any(|op| matches!(op, StatOperation::Quantile(_)));
let mean = if need_mean {
Some(
data.iter().fold(F::zero(), |acc, &x| acc + x)
/ F::from(data.len()).expect("Operation failed"),
)
} else {
None
};
let sorteddata = if need_sorted {
let mut sorted = data.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
Some(sorted)
} else {
None
};
for op in &self.operations {
match op {
StatOperation::Mean => {
results.push(mean.expect("Operation failed"));
}
StatOperation::Variance(ddof) => {
let m = mean.expect("Operation failed");
let var = data
.iter()
.map(|&x| {
let diff = x - m;
diff * diff
})
.sum::<F>()
/ F::from(data.len() - ddof).expect("Operation failed");
results.push(var);
}
StatOperation::Quantile(q) => {
let sorted = sorteddata.as_ref().expect("Operation failed");
let pos = *q * (sorted.len() - 1) as f64;
let idx = pos.floor() as usize;
let frac = pos - pos.floor();
let result = if frac == 0.0 {
sorted[idx]
} else {
let lower = sorted[idx];
let upper = sorted[idx + 1];
lower + F::from(frac).expect("Failed to convert to float") * (upper - lower)
};
results.push(result);
}
StatOperation::StandardScaling => {
results.push(F::one());
}
}
}
Ok(results)
}
}
pub struct MemoryTracker {
current_usage: usize,
peak_usage: usize,
allocations: usize,
deallocations: usize,
}
impl Default for MemoryTracker {
fn default() -> Self {
Self::new()
}
}
impl MemoryTracker {
pub fn new() -> Self {
Self {
current_usage: 0,
peak_usage: 0,
allocations: 0,
deallocations: 0,
}
}
pub fn record_allocation(&mut self, bytes: usize) {
self.current_usage += bytes;
self.peak_usage = self.peak_usage.max(self.current_usage);
self.allocations += 1;
}
pub fn record_deallocation(&mut self, bytes: usize) {
self.current_usage = self.current_usage.saturating_sub(bytes);
self.deallocations += 1;
}
pub fn get_profile(&self) -> MemoryProfile {
let efficiency_score = if self.peak_usage > 0 {
1.0 - (self.current_usage as f64 / self.peak_usage as f64)
} else {
1.0
};
MemoryProfile {
peak_memory: self.peak_usage,
avg_memory: (self.peak_usage + self.current_usage) / 2,
allocations: self.allocations,
deallocations: self.deallocations,
efficiency_score,
}
}
}
pub mod cache_friendly {
use super::*;
pub fn tiled_matrix_operation<F, D1, D2, Op>(
a: &ArrayBase<D1, Ix2>,
b: &ArrayBase<D2, Ix2>,
tilesize: usize,
operation: Op,
) -> StatsResult<Array2<F>>
where
F: Float + NumCast,
D1: Data<Elem = F>,
D2: Data<Elem = F>,
Op: Fn(ArrayView2<F>, ArrayView2<F>) -> StatsResult<Array2<F>>,
{
let (m, k1) = a.dim();
let (k2, n) = b.dim();
if k1 != k2 {
return Err(StatsError::dimension_mismatch(
"Matrix dimensions incompatible",
));
}
let mut result = Array2::zeros((m, n));
for i in (0..m).step_by(tilesize) {
for j in (0..n).step_by(tilesize) {
for k in (0..k1).step_by(tilesize) {
let i_end = (i + tilesize).min(m);
let j_end = (j + tilesize).min(n);
let k_end = (k + tilesize).min(k1);
let a_tile = a.slice(s![i..i_end, k..k_end]);
let b_tile = b.slice(s![k..k_end, j..j_end]);
let tile_result = operation(a_tile, b_tile)?;
let mut result_tile = result.slice_mut(s![i..i_end, j..j_end]);
result_tile.zip_mut_with(&tile_result, |r, &t| *r = *r + t);
}
}
}
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use approx::assert_relative_eq;
use scirs2_core::ndarray::array;
#[test]
fn test_memory_adaptive_algorithm() {
let adapter = MemoryAdaptiveAlgorithm::new();
match adapter.recommend_algorithm::<f64>(100) {
AlgorithmChoice::Direct => (), _ => panic!("Expected Direct algorithm for small data"),
}
let hugedatasize = adapter.available_memory / 4; match adapter.recommend_algorithm::<f64>(hugedatasize) {
AlgorithmChoice::Streaming(_) => (), other => panic!(
"Expected Streaming algorithm for large data, got {:?}",
other
),
}
}
#[test]
fn test_ring_buffer_stats() {
let mut buffer = RingBufferStats::<f64>::new(5);
for i in 1..=5 {
buffer.push(i as f64);
}
assert_relative_eq!(buffer.mean(), 3.0, epsilon = 1e-10);
buffer.push(6.0);
assert_relative_eq!(buffer.mean(), 4.0, epsilon = 1e-10); }
#[test]
fn test_lazy_computation() {
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let lazy = LazyStatComputation::new(data)
.mean()
.variance(1)
.quantile(0.5);
let results = lazy.compute().expect("Operation failed");
assert_eq!(results.len(), 3);
assert_relative_eq!(results[0], 3.0, epsilon = 1e-10); assert_relative_eq!(results[1], 2.5, epsilon = 1e-10); assert_relative_eq!(results[2], 3.0, epsilon = 1e-10); }
#[test]
fn test_zero_copy_rolling() {
let data = array![1.0, 2.0, 3.0, 4.0, 5.0];
let results = zero_copy::rolling_stats_zerocopy(&data.view(), 3, |window| {
Ok(window.mean().expect("Operation failed"))
})
.expect("Operation failed");
assert_eq!(results.len(), 3);
assert_relative_eq!(results[0], 2.0, epsilon = 1e-10);
assert_relative_eq!(results[1], 3.0, epsilon = 1e-10);
assert_relative_eq!(results[2], 4.0, epsilon = 1e-10);
}
}