use ndarray::{Array1, Array2};
use std::sync::Arc;
use crate::column::Column;
use crate::error::{Error, Result};
use crate::gpu::operations::{GpuAccelerated, GpuMatrix, GpuVector};
use crate::gpu::{get_gpu_manager, GpuError, GpuManager};
use crate::optimized::split_dataframe::core::{ColumnView, OptimizedDataFrame};
impl GpuAccelerated for OptimizedDataFrame {
fn gpu_accelerate(&self) -> Result<Self> {
Ok(self.clone())
}
fn is_gpu_acceleratable(&self) -> bool {
self.row_count() >= 10_000 }
}
impl OptimizedDataFrame {
pub fn matrix_multiply(&self, columns1: &[&str], columns2: &[&str]) -> Result<Array2<f64>> {
let gpu_manager = get_gpu_manager()?;
let use_gpu = gpu_manager.is_available()
&& self.row_count() >= gpu_manager.context().config().min_size_threshold;
let matrix1 = self.to_matrix(columns1)?;
let matrix2 = self.to_matrix(columns2)?;
if use_gpu {
let gpu_matrix1 = GpuMatrix::new(matrix1.clone());
let gpu_matrix2 = GpuMatrix::new(matrix2.clone());
match gpu_matrix1.dot(&gpu_matrix2) {
Ok(result) => Ok(result.data),
Err(e) => {
if gpu_manager.context().config().fallback_to_cpu {
let result = matrix1.dot(&matrix2);
Ok(result)
} else {
Err(e)
}
}
}
} else {
let result = matrix1.dot(&matrix2);
Ok(result)
}
}
fn to_matrix(&self, columns: &[&str]) -> Result<Array2<f64>> {
let n_rows = self.row_count();
let n_cols = columns.len();
let mut matrix = Array2::zeros((n_rows, n_cols));
for (col_idx, col_name) in columns.iter().enumerate() {
let col_view = self.column(*col_name)?;
match &col_view.column {
Column::Float64(col) => {
for row_idx in 0..n_rows {
matrix[[row_idx, col_idx]] =
col.get(row_idx).unwrap_or(Some(0.0)).unwrap_or(0.0);
}
}
Column::Int64(col) => {
for row_idx in 0..n_rows {
matrix[[row_idx, col_idx]] =
col.get(row_idx).unwrap_or(Some(0)).unwrap_or(0) as f64;
}
}
Column::Boolean(col) => {
for row_idx in 0..n_rows {
let val = col.get(row_idx).unwrap_or(Some(false)).unwrap_or(false);
matrix[[row_idx, col_idx]] = if val { 1.0 } else { 0.0 };
}
}
Column::String(_) => {
return Err(Error::Type(format!(
"Cannot convert string column '{}' to numeric matrix",
col_name
)));
}
}
}
Ok(matrix)
}
pub fn corr_matrix(&self, columns: &[&str]) -> Result<Array2<f64>> {
let gpu_manager = get_gpu_manager()?;
let use_gpu = gpu_manager.is_available()
&& self.row_count() >= gpu_manager.context().config().min_size_threshold;
let data_matrix = self.to_matrix(columns)?;
let n_cols = columns.len();
if use_gpu {
let gpu_data = GpuMatrix::new(data_matrix.clone());
let mut centered_data = data_matrix.clone();
for col_idx in 0..n_cols {
let col_mean = data_matrix.column(col_idx).mean().unwrap_or(0.0);
for row_idx in 0..self.row_count() {
centered_data[[row_idx, col_idx]] -= col_mean;
}
}
let gpu_centered = GpuMatrix::new(centered_data);
let cov_matrix =
gpu_centered.data.t().dot(&gpu_centered.data) / (self.row_count() - 1) as f64;
let mut corr_matrix = Array2::zeros((n_cols, n_cols));
for i in 0..n_cols {
for j in 0..n_cols {
if i == j {
corr_matrix[[i, j]] = 1.0;
} else {
let cov_ij = cov_matrix[[i, j]];
let var_i = cov_matrix[[i, i]];
let var_j = cov_matrix[[j, j]];
corr_matrix[[i, j]] = cov_ij / (var_i.sqrt() * var_j.sqrt());
}
}
}
Ok(corr_matrix)
} else {
compute_corr_matrix_cpu(&data_matrix)
}
}
}
fn compute_corr_matrix_cpu(data_matrix: &Array2<f64>) -> Result<Array2<f64>> {
let n_rows = data_matrix.shape()[0];
let n_cols = data_matrix.shape()[1];
let mut means = Vec::with_capacity(n_cols);
for col_idx in 0..n_cols {
means.push(data_matrix.column(col_idx).mean().unwrap_or(0.0));
}
let mut corr_matrix = Array2::zeros((n_cols, n_cols));
for i in 0..n_cols {
corr_matrix[[i, i]] = 1.0;
for j in (i + 1)..n_cols {
let mut cov_sum = 0.0;
let mut var_i_sum = 0.0;
let mut var_j_sum = 0.0;
for row_idx in 0..n_rows {
let x_i = data_matrix[[row_idx, i]] - means[i];
let x_j = data_matrix[[row_idx, j]] - means[j];
cov_sum += x_i * x_j;
var_i_sum += x_i * x_i;
var_j_sum += x_j * x_j;
}
let corr_ij = cov_sum / (var_i_sum.sqrt() * var_j_sum.sqrt());
corr_matrix[[i, j]] = corr_ij;
corr_matrix[[j, i]] = corr_ij;
}
}
Ok(corr_matrix)
}