use scirs2_core::ndarray::{Array1, Array2};
use crate::error::KernelError;
use crate::kernel_pca::centering::{center_test_kernel, double_center, KernelCenteringStats};
use crate::kernel_pca::eigendecomp::{symmetric_eigendecomp, TopKEigen};
use crate::kernel_pca::error::{KernelPcaError, KernelPcaResult};
use crate::types::Kernel;
#[derive(Clone, Debug, PartialEq)]
pub struct KernelPcaConfig {
pub n_components: usize,
pub center: bool,
}
impl KernelPcaConfig {
pub fn new(n_components: usize) -> Self {
Self {
n_components,
center: true,
}
}
pub fn with_center(mut self, center: bool) -> Self {
self.center = center;
self
}
}
#[derive(Clone, Debug)]
pub struct KernelPCA<K: Kernel> {
kernel: K,
config: KernelPcaConfig,
}
impl<K: Kernel> KernelPCA<K> {
pub fn new(kernel: K, config: KernelPcaConfig) -> KernelPcaResult<Self> {
if config.n_components == 0 {
return Err(KernelPcaError::InvalidInput(
"KernelPCA::new: n_components must be >= 1".to_string(),
));
}
Ok(Self { kernel, config })
}
pub fn kernel(&self) -> &K {
&self.kernel
}
pub fn config(&self) -> &KernelPcaConfig {
&self.config
}
}
pub struct FittedKernelPCA<K: Kernel> {
kernel: Box<dyn Kernel>,
alphas: Array2<f64>,
eigenvalues: Array1<f64>,
training_data: Vec<Vec<f64>>,
centering_stats: KernelCenteringStats,
n_components: usize,
feature_dim: usize,
_marker: std::marker::PhantomData<K>,
}
impl<K: Kernel> std::fmt::Debug for FittedKernelPCA<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FittedKernelPCA")
.field("kernel_name", &self.kernel.name())
.field("n_components", &self.n_components)
.field("feature_dim", &self.feature_dim)
.field("n_training_points", &self.training_data.len())
.field("eigenvalues", &self.eigenvalues)
.finish()
}
}
impl<K: Kernel> FittedKernelPCA<K> {
pub fn n_components(&self) -> usize {
self.n_components
}
pub fn feature_dim(&self) -> usize {
self.feature_dim
}
pub fn eigenvalues(&self) -> &Array1<f64> {
&self.eigenvalues
}
pub fn alphas(&self) -> &Array2<f64> {
&self.alphas
}
pub fn training_data(&self) -> &[Vec<f64>] {
&self.training_data
}
pub fn centering_stats(&self) -> &KernelCenteringStats {
&self.centering_stats
}
pub fn explained_variance_ratio(&self) -> Array1<f64> {
let total: f64 = self.eigenvalues.iter().copied().sum();
if total <= 0.0 {
return Array1::<f64>::zeros(self.n_components);
}
let mut out = Array1::<f64>::zeros(self.n_components);
for (i, v) in self.eigenvalues.iter().enumerate() {
out[i] = v / total;
}
out
}
pub fn transform(&self, points: &[Vec<f64>]) -> KernelPcaResult<Array2<f64>> {
if points.is_empty() {
return Err(KernelPcaError::InvalidInput(
"FittedKernelPCA::transform: points must not be empty".to_string(),
));
}
let n_train = self.training_data.len();
let k = self.n_components;
let mut out = Array2::<f64>::zeros((points.len(), k));
for (pi, point) in points.iter().enumerate() {
if point.len() != self.feature_dim {
return Err(KernelPcaError::DimensionMismatch {
expected: self.feature_dim,
got: point.len(),
context: format!("FittedKernelPCA::transform: points[{}]", pi),
});
}
let mut k_test = vec![0.0f64; n_train];
for (ti, train_row) in self.training_data.iter().enumerate() {
k_test[ti] = self
.kernel
.compute(point, train_row)
.map_err(KernelPcaError::from_kernel)?;
}
let centered: Array1<f64> = center_test_kernel(&k_test, &self.centering_stats)?;
for c in 0..k {
let mut acc = 0.0f64;
for i in 0..n_train {
acc += centered[i] * self.alphas[(i, c)];
}
out[(pi, c)] = acc;
}
}
Ok(out)
}
}
impl<K> Kernel for KernelPCA<K>
where
K: Kernel,
{
fn compute(&self, _x: &[f64], _y: &[f64]) -> crate::error::Result<f64> {
Err(KernelError::InvalidParameter {
parameter: "KernelPCA".to_string(),
value: "not a kernel".to_string(),
reason: "KernelPCA is an estimator, not a Kernel; use fit/transform instead"
.to_string(),
})
}
fn name(&self) -> &str {
"KernelPCA"
}
fn is_psd(&self) -> bool {
false
}
}
pub(crate) trait KernelCloneExt {
fn clone_box(&self) -> Box<dyn Kernel>;
}
impl<K: Kernel + Clone + 'static> KernelCloneExt for K {
fn clone_box(&self) -> Box<dyn Kernel> {
Box::new(self.clone())
}
}
impl<K: Kernel + Clone + 'static> KernelPCA<K> {
pub fn build(kernel: K, config: KernelPcaConfig) -> KernelPcaResult<Self> {
Self::new(kernel, config)
}
pub fn fit(&self, training: &[Vec<f64>]) -> KernelPcaResult<FittedKernelPCA<K>> {
let n = training.len();
if n == 0 {
return Err(KernelPcaError::InvalidInput(
"KernelPCA::fit: training set must not be empty".to_string(),
));
}
let d = training[0].len();
if d == 0 {
return Err(KernelPcaError::InvalidInput(
"KernelPCA::fit: feature dimension must be >= 1".to_string(),
));
}
for (i, row) in training.iter().enumerate() {
if row.len() != d {
return Err(KernelPcaError::InvalidInput(format!(
"KernelPCA::fit: training[{}] has {} features (expected {})",
i,
row.len(),
d
)));
}
}
if self.config.n_components > n {
return Err(KernelPcaError::InvalidInput(format!(
"KernelPCA::fit: n_components ({}) cannot exceed training size ({})",
self.config.n_components, n
)));
}
let gram_rows = self
.kernel
.compute_matrix(training)
.map_err(KernelPcaError::from_kernel)?;
let mut gram = Array2::<f64>::zeros((n, n));
for i in 0..n {
if gram_rows[i].len() != n {
return Err(KernelPcaError::EigendecompositionFailed(format!(
"kernel.compute_matrix returned ragged row {} (len {}, expected {})",
i,
gram_rows[i].len(),
n
)));
}
for j in 0..n {
gram[(i, j)] = gram_rows[i][j];
}
}
for i in 0..n {
for j in (i + 1)..n {
let avg = 0.5 * (gram[(i, j)] + gram[(j, i)]);
gram[(i, j)] = avg;
gram[(j, i)] = avg;
}
}
let (centered, centering_stats) = if self.config.center {
double_center(&gram)?
} else {
(
gram.clone(),
KernelCenteringStats {
row_means: Array1::<f64>::zeros(n),
grand_mean: 0.0,
},
)
};
let TopKEigen {
eigenvalues,
eigenvectors,
} = symmetric_eigendecomp(¢ered, self.config.n_components)?;
let k = self.config.n_components;
let mut alphas = Array2::<f64>::zeros((n, k));
for c in 0..k {
let lam = eigenvalues[c];
if lam <= 0.0 {
return Err(KernelPcaError::InsufficientComponents {
requested: k,
available: c,
});
}
let scale = 1.0 / lam.sqrt();
for r in 0..n {
alphas[(r, c)] = eigenvectors[(r, c)] * scale;
}
}
Ok(FittedKernelPCA {
kernel: KernelCloneExt::clone_box(&self.kernel),
alphas,
eigenvalues,
training_data: training.to_vec(),
centering_stats,
n_components: k,
feature_dim: d,
_marker: std::marker::PhantomData,
})
}
pub fn fit_transform(
&self,
training: &[Vec<f64>],
) -> KernelPcaResult<(FittedKernelPCA<K>, Array2<f64>)> {
let fitted = self.fit(training)?;
let projected = fitted.transform(training)?;
Ok((fitted, projected))
}
}