use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashSet};
use std::fmt::Debug;
use approx::relative_eq;
use rayon::prelude::*;
use smartcore::linalg::basic::arrays::{Array, Array2, MutArray};
use smartcore::linalg::basic::matrix::DenseMatrix;
use sprs::CsMat;
use crate::builder::ConfigValue;
use crate::graph::GraphLaplacian;
use crate::reduction::ImplicitProjection;
use crate::search::sorted_index::SortedLambdas;
use crate::search::taumode::TauMode;
use log::{debug, info, trace, warn};
#[derive(Clone, Debug)]
pub struct ArrowItem {
pub item: Vec<f64>,
pub lambda: f64,
}
#[derive(Clone, Debug)]
pub struct ArrowFeature {
pub feature: Vec<f64>,
}
impl ArrowItem {
#[inline]
pub fn new(item: &[f64], lambda: f64) -> Self {
trace!(
"Creating ArrowItem with {} dimensions, lambda: {:.6}",
item.len(),
lambda
);
Self {
item: item.to_vec(),
lambda,
}
}
#[inline]
pub fn len(&self) -> usize {
self.item.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.item.is_empty()
}
#[inline]
pub fn lambda_component_similarity(&self, other: &ArrowItem) -> f64 {
let lambda_diff = (self.lambda - other.lambda).abs();
1.0 - lambda_diff.min(1.0)
}
#[inline]
pub fn lambda_similarity(&self, other: &ArrowItem, alpha: f64) -> f64 {
assert_eq!(
self.item.len(),
other.item.len(),
"items should be of the same length"
);
let cosine_sim = self.cosine_similarity(&other.item);
let lambda_sim = self.lambda_component_similarity(other);
let result = alpha * cosine_sim + cosine_sim.signum() * (1.0 - alpha) * lambda_sim;
trace!(
"Lambda similarity: semantic={:.6}, lambda={:.6}, combined={:.6}",
cosine_sim, lambda_sim, result
);
result
}
#[inline]
pub fn dot(&self, other: &ArrowItem) -> f64 {
assert_eq!(self.len(), other.len(), "Dimension mismatch");
let result = self
.item
.iter()
.zip(other.item.iter())
.map(|(a, b)| a * b)
.sum();
trace!("Computed dot product: {:.6}", result);
result
}
#[inline]
pub fn norm(a: &[f64]) -> f64 {
let result = a.iter().map(|&x| x * x).sum::<f64>().sqrt();
trace!("Computed norm: {:.6}", result);
result
}
#[inline]
pub fn cosine_similarity(&self, other: &[f64]) -> f64 {
let denom = ArrowItem::norm(&self.item) * ArrowItem::norm(other);
let result = if denom > 0.0 {
self.dot(&ArrowItem::new(other, 0.0)) / denom
} else {
warn!("Zero vector encountered in cosine similarity computation");
0.0
};
trace!("Computed cosine similarity: {:.6}", result);
result
}
#[inline]
pub fn euclidean_distance(&self, other: &ArrowItem) -> f64 {
assert_eq!(self.len(), other.len(), "Dimension mismatch");
let result = self
.item
.iter()
.zip(other.item.iter())
.map(|(a, b)| (a - b).powi(2))
.sum::<f64>()
.sqrt();
trace!("Computed Euclidean distance: {:.6}", result);
result
}
#[inline]
pub fn add_inplace(&mut self, other: &ArrowItem) {
assert_eq!(self.len(), other.len(), "Dimension mismatch");
trace!("Adding vectors in-place");
self.item
.iter_mut()
.zip(other.item.iter())
.for_each(|(a, b)| *a += *b);
}
#[inline]
pub fn mul_inplace(&mut self, other: &ArrowItem) {
assert_eq!(self.len(), other.len(), "Dimension mismatch");
trace!("Multiplying vectors element-wise in-place");
self.item
.iter_mut()
.zip(other.item.iter())
.for_each(|(a, b)| *a *= *b);
}
#[inline]
pub fn scale(&mut self, scalar: f64) {
trace!("Scaling vector by {:.6}", scalar);
self.item.iter_mut().for_each(|x| *x *= scalar);
}
#[inline]
pub fn iter(&self) -> std::slice::Iter<'_, f64> {
self.item.iter()
}
#[inline]
pub fn iter_mut(&mut self) -> std::slice::IterMut<'_, f64> {
self.item.iter_mut()
}
}
#[derive(Debug, Clone, Copy)]
struct ScoredItem {
index: usize,
score: f64,
}
impl PartialEq for ScoredItem {
fn eq(&self, other: &Self) -> bool {
self.score == other.score
}
}
impl Eq for ScoredItem {}
impl PartialOrd for ScoredItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.score.partial_cmp(&self.score)
}
}
impl Ord for ScoredItem {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap_or(Ordering::Equal)
}
}
#[derive(Clone, Debug)]
pub struct ArrowSpace {
pub nfeatures: usize, pub nitems: usize,
pub data: DenseMatrix<f64>, pub signals: CsMat<f64>, pub lambdas: Vec<f64>, pub lambdas_sorted: SortedLambdas, pub taumode: TauMode,
pub min_lambdas: f64,
pub max_lambdas: f64,
pub(crate) range_lambdas: f64,
pub n_clusters: usize,
pub cluster_assignments: Vec<Option<usize>>,
pub cluster_sizes: Vec<usize>,
pub cluster_radius: f64,
pub projection_matrix: Option<ImplicitProjection>, pub reduced_dim: Option<usize>, pub extra_reduced_dim: bool,
pub centroid_map: Option<Vec<usize>>, pub sub_centroids: Option<DenseMatrix<f64>>,
pub subcentroid_lambdas: Option<Vec<f64>>,
pub item_norms: Option<Vec<f64>>,
}
pub const TAUDEFAULT: TauMode = TauMode::Median;
impl Default for ArrowSpace {
fn default() -> Self {
debug!("Creating default ArrowSpace");
Self {
nfeatures: 0,
nitems: 0,
data: DenseMatrix::new(0, 0, Vec::new(), true).unwrap(),
signals: sprs::CsMat::zero((0, 0)),
lambdas: Vec::new(),
lambdas_sorted: SortedLambdas::new(),
min_lambdas: -1.0,
max_lambdas: -1.0,
range_lambdas: -1.0,
taumode: TAUDEFAULT,
n_clusters: 0,
cluster_assignments: Vec::new(),
cluster_sizes: Vec::new(),
cluster_radius: 0.0,
projection_matrix: None,
reduced_dim: None,
extra_reduced_dim: false,
centroid_map: None,
sub_centroids: None,
subcentroid_lambdas: None,
item_norms: None,
}
}
}
impl ArrowSpace {
pub(crate) fn new(items: Vec<Vec<f64>>, taumode: TauMode) -> Self {
assert!(!items.is_empty(), "items cannot be empty");
assert!(
items.len() > 1,
"cannot create a arrowspace of one arrow only"
);
let n_items = items.len(); let n_features = items[0].len(); Self {
nfeatures: n_features,
nitems: n_items,
data: DenseMatrix::from_2d_vec(&items).unwrap(),
signals: sprs::CsMat::zero((0, 0)), lambdas: vec![0.0; n_items], lambdas_sorted: SortedLambdas::new(),
min_lambdas: -1.0,
max_lambdas: -1.0,
range_lambdas: -1.0,
taumode,
n_clusters: 0,
cluster_assignments: Vec::new(),
cluster_sizes: Vec::new(),
cluster_radius: 0.0,
projection_matrix: None,
reduced_dim: None,
extra_reduced_dim: false,
centroid_map: None,
sub_centroids: None,
subcentroid_lambdas: None,
item_norms: None,
}
}
pub fn empty_with_projection(
proj_data: HashMap<String, ConfigValue>,
nrows: usize,
ncols: usize,
) -> Self {
debug!(
"ArrowSpace::empty_with_projection called with nrows={}, ncols={}",
nrows, ncols
);
let extra_reduced = proj_data["extra_reduced_dim"].as_bool().unwrap();
debug!("extra_reduced_dim from proj_data: {}", extra_reduced);
assert!(
extra_reduced == false,
"Reconstructing with extra dim reduction is not implemented yet"
);
let has_projection = proj_data["pj_mtx_original_dim"].as_usize().is_some();
debug!("projection present in proj_data: {}", has_projection);
let mut aspace = Self::default();
aspace.nitems = nrows;
aspace.nfeatures = ncols;
if has_projection {
let original_dim = proj_data["pj_mtx_original_dim"]
.as_usize()
.expect("pj_mtx_original_dim must be usize when projection is present");
let reduced_dim = proj_data["pj_mtx_reduced_dim"]
.as_usize()
.expect("pj_mtx_reduced_dim must be usize when projection is present");
let seed = proj_data["pj_mtx_seed"]
.as_u64()
.expect("pj_mtx_seed must be u64 when projection is present");
info!(
"Reconstructing ImplicitProjection: original_dim={}, reduced_dim={}, seed={}",
original_dim, reduced_dim, seed
);
aspace.projection_matrix = Some(ImplicitProjection {
original_dim,
reduced_dim,
seed,
});
aspace.reduced_dim = Some(reduced_dim);
aspace.extra_reduced_dim = extra_reduced;
} else {
warn!(
"empty_with_projection called without projection metadata; \
returning ArrowSpace without projection_matrix"
);
}
debug!(
"ArrowSpace::empty_with_projection created ArrowSpace \
with nitems={}, nfeatures={}, reduced_dim={:?}",
aspace.nitems, aspace.nfeatures, aspace.reduced_dim
);
aspace
}
pub fn from_config(config: HashMap<String, ConfigValue>) -> Self {
let nitems = config
.get("nitems")
.and_then(|v| v.as_usize())
.expect("from_config: missing nitems");
let nfeatures = config
.get("nfeatures")
.and_then(|v| v.as_usize())
.expect("from_config: missing nfeatures");
debug!(
"ArrowSpace::from_config called (nitems={}, nfeatures={})",
nitems, nfeatures
);
let projection_matrix = if let (
Some(ConfigValue::OptionUsize(Some(original_dim))),
Some(ConfigValue::OptionUsize(Some(reduced_dim))),
Some(ConfigValue::OptionU64(Some(seed))),
) = (
config.get("pj_mtx_original_dim"),
config.get("pj_mtx_reduced_dim"),
config.get("pj_mtx_seed"),
) {
info!(
"ArrowSpace::from_config: projection matrix used: original_dim={}, reduced_dim={}",
original_dim, reduced_dim
);
Some(ImplicitProjection {
original_dim: *original_dim,
reduced_dim: *reduced_dim,
seed: *seed,
})
} else {
debug!("ArrowSpace::from_config: projection matrix not used");
None
};
let reduced_dim = match config.get("pj_mtx_reduced_dim") {
Some(ConfigValue::OptionUsize(Some(d))) => Some(*d),
_ => None,
};
let extra_reduced_dim = config
.get("extra_reduced_dim")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let taumode = config
.get("taumode")
.and_then(|v| match v {
ConfigValue::TauMode(t) => Some(t.clone()),
_ => None,
})
.unwrap_or_default();
let n_clusters = config
.get("n_clusters")
.and_then(|v| v.as_usize())
.unwrap_or(0);
let cluster_radius = config
.get("cluster_radius")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
info!(
"ArrowSpace::from_config: n_clusters={}, cluster_radius={}",
n_clusters, cluster_radius
);
let data = DenseMatrix::new(0, 0, Vec::new(), true).unwrap();
let signals = sprs::CsMat::zero((0, 0));
let lambdas = vec![0.0; nitems];
let lambdas_sorted = SortedLambdas::new();
let aspace = ArrowSpace {
nfeatures,
nitems,
data,
signals,
lambdas,
lambdas_sorted,
min_lambdas: -1.0,
max_lambdas: -1.0,
range_lambdas: -1.0,
taumode,
n_clusters,
cluster_assignments: Vec::new(),
cluster_sizes: Vec::new(),
cluster_radius,
projection_matrix,
reduced_dim,
extra_reduced_dim,
centroid_map: None,
sub_centroids: None,
subcentroid_lambdas: None,
item_norms: None,
};
debug!(
"ArrowSpace::from_config created ArrowSpace: nitems={}, nfeatures={}, n_clusters={}, reduced_dim={:?}, extra_reduced_dim={}",
aspace.nitems,
aspace.nfeatures,
aspace.n_clusters,
aspace.reduced_dim,
aspace.extra_reduced_dim
);
aspace
}
pub fn drop_data(&mut self) {
info!("Freeing raw input memory, should have been persisted to file");
self.data = DenseMatrix::new(0, 0, vec![], true).unwrap();
}
pub(crate) fn new_from_dense(items: DenseMatrix<f64>, taumode: TauMode) -> Self {
let n_items = items.shape().0; let n_features = items.shape().1; let empty = items.is_empty();
info!(r#"new_from_dense: {n_items}x{n_features} -> empty: {empty}"#);
assert!(
items.shape().0 > 1,
"cannot create a arrowspace of one arrow only"
);
Self {
nfeatures: n_features,
nitems: n_items,
data: items,
signals: sprs::CsMat::zero((0, 0)), lambdas: vec![0.0; n_items], lambdas_sorted: SortedLambdas::new(),
min_lambdas: -1.0,
max_lambdas: -1.0,
range_lambdas: -1.0,
taumode,
n_clusters: 0,
cluster_assignments: Vec::new(),
cluster_sizes: Vec::new(),
cluster_radius: 0.0,
projection_matrix: None,
reduced_dim: None,
extra_reduced_dim: false,
centroid_map: None,
sub_centroids: None,
subcentroid_lambdas: None,
item_norms: None,
}
}
#[inline]
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn from_items(items: Vec<Vec<f64>>, taumode: TauMode) -> Self {
warn!(
"This is just a test method. Use ArrowSpaceBuilder. Creating ArrowSpace from {} items with custom tau mode: {:?}",
items.len(),
taumode
);
let mut aspace = Self::from_items_default(items);
aspace.taumode = taumode;
aspace
}
#[inline]
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn from_items_default(items: Vec<Vec<f64>>) -> Self {
assert!(!items.is_empty(), "items cannot be empty");
assert!(
items.len() > 1,
"cannot create a arrowspace of one arrow only"
);
let n_items = items.len(); let n_features = items[0].len();
warn!(
"This is a test method, use ArrowSpaceBuilder. Creating ArrowSpace from {} items with {} features",
n_items, n_features
);
debug!("Using default tau mode: {:?}", TAUDEFAULT);
assert!(
items.iter().all(|item| item.len() == n_features),
"All items must have identical number of features"
);
trace!("Constructing DenseMatrix from 2D vector");
let data_matrix = DenseMatrix::from_2d_vec(&items).unwrap();
debug!("ArrowSpace data matrix shape: {:?}", data_matrix.shape());
Self {
nfeatures: n_features,
nitems: n_items,
data: data_matrix,
signals: sprs::CsMat::zero((0, 0)), lambdas: vec![0.0; n_items], lambdas_sorted: SortedLambdas::new(),
min_lambdas: -1.0,
max_lambdas: -1.0,
range_lambdas: -1.0,
taumode: TAUDEFAULT,
n_clusters: 0,
cluster_assignments: Vec::new(),
cluster_sizes: Vec::new(),
cluster_radius: 0.0,
projection_matrix: None,
reduced_dim: None,
extra_reduced_dim: false,
centroid_map: None,
sub_centroids: None,
subcentroid_lambdas: None,
item_norms: None,
}
}
pub(crate) fn subcentroids_from_dense_matrix(matrix: DenseMatrix<f64>) -> Self {
let (n_rows, n_cols) = matrix.shape();
let nitems = n_rows;
let nfeatures = n_cols;
info!(
"Creating subcentroid ArrowSpace from DenseMatrix({}, {})",
n_rows, n_cols
);
info!(
"→ Interpreted as: {} subcentroids × {} features (row-major)",
nitems, nfeatures
);
Self {
data: matrix,
nitems,
nfeatures,
signals: sprs::CsMat::zero((0, 0)),
lambdas: vec![0.0; nitems],
lambdas_sorted: SortedLambdas::new(),
min_lambdas: -1.0,
max_lambdas: -1.0,
range_lambdas: -1.0,
taumode: TAUDEFAULT,
n_clusters: 0,
cluster_assignments: Vec::new(),
cluster_sizes: Vec::new(),
cluster_radius: 0.0,
projection_matrix: None,
reduced_dim: None,
extra_reduced_dim: false,
centroid_map: None,
sub_centroids: None,
subcentroid_lambdas: None,
item_norms: None,
}
}
pub fn project_query(&self, query: &[f64]) -> Vec<f64> {
assert_eq!(
query.len(),
self.nfeatures,
"Query dimension {} doesn't match index original dimension {}",
query.len(),
self.nfeatures
);
if let Some(ref proj) = self.projection_matrix {
trace!(
"Projecting query: {} → {} dimensions using seed-based projection",
self.nfeatures,
self.reduced_dim.unwrap()
);
proj.project(query)
} else {
trace!("No projection applied, returning original query");
query.to_vec()
}
}
pub fn prepare_query_item(&self, query: &[f64], gl: &GraphLaplacian) -> f64 {
assert!(
query.iter().all(|x| x.is_finite()),
"query item has non-finite values"
);
if let (Some(subcentroids), Some(sc_lambdas)) =
(&self.sub_centroids, &self.subcentroid_lambdas)
{
let mut best_idx = 0;
let mut best_dist = f64::INFINITY;
for sc_idx in 0..subcentroids.shape().0 {
let query = if self.extra_reduced_dim {
&self.project_query(query)
} else {
query
};
let dist: f64 = query
.iter()
.zip(subcentroids.get_row(sc_idx).iterator(0))
.map(|(a, b)| (a - b).powi(2))
.sum::<f64>()
.sqrt();
if dist < best_dist {
best_dist = dist;
best_idx = sc_idx;
}
}
let lambda = sc_lambdas[best_idx];
trace!(
"Query mapped to subcentroid {}/{} with λ={:.6} (dist={:.4})",
best_idx,
subcentroids.shape().0,
lambda,
best_dist
);
return lambda;
}
let tau = TauMode::select_tau(&query, self.taumode);
let raw_lambda = TauMode::compute_synthetic_lambda(
&query,
self.projection_matrix.clone(),
&gl.matrix,
tau,
);
let msg = "Check your eps parameter for the builder, every dataset has an optimal eps. \n \
Also, the query item may be out of context for the dataset (undecidable), \
despite all safeguards its lambda is 0.0";
if self.range_lambdas.is_finite() {
if relative_eq!(raw_lambda, 0.0, epsilon = 1e-12) {
panic!("{}", msg)
}
return self.normalise_query_lambda(raw_lambda);
} else {
if relative_eq!(raw_lambda, 0.0, epsilon = 1e-12) {
panic!("{}", msg)
}
return raw_lambda;
}
}
pub fn build_lambdas_sorted(&mut self) {
self.lambdas_sorted.build_from(&self.lambdas);
}
#[inline]
pub fn lambdas(&self) -> &[f64] {
self.lambdas.as_ref()
}
#[inline]
pub fn cluster_of(&self, i: usize) -> Option<usize> {
self.cluster_assignments.get(i).copied().flatten()
}
#[inline]
pub fn get_feature(&self, i: usize) -> ArrowFeature {
assert!(i < self.nfeatures, "feature index out of bounds");
trace!("Extracting feature {} from ArrowSpace", i);
ArrowFeature {
feature: self.data.get_col(i).iterator(0).copied().collect(),
}
}
#[allow(dead_code)]
fn set_feature(&mut self, f: usize, values: ArrowFeature) {
assert!(f < self.nfeatures, "feature index out of bounds");
debug!("Setting feature {} in-place", f);
for i in 0..self.nitems {
self.data.set((i, f), values.feature[i]);
}
}
#[inline]
pub fn get_item(&self, i: usize) -> ArrowItem {
assert!(i < self.nitems, "item index out of bounds");
trace!("Extracting item {} with lambda {:.6}", i, self.lambdas[i]);
ArrowItem::new(
self.data
.get_row(i)
.iterator(0)
.copied()
.collect::<Vec<f64>>()
.as_ref(),
self.lambdas[i],
)
}
fn set_item(&mut self, i: usize, values: ArrowItem) {
assert!(i < self.nitems, "item index out of bounds");
debug!("Setting item {} in-place", i);
for f in 0..self.nfeatures {
self.data.set((i, f), values.item[f]);
}
}
#[inline]
pub fn add_items(&mut self, a: usize, b: usize, gl: &GraphLaplacian) {
assert!(
a < self.nitems && b < self.nitems,
"Item indices out of bounds: a={}, b={}, ncols={}",
a,
b,
self.nitems
);
assert_eq!(
gl.nnodes, self.nitems,
"Laplacian nodes must match number of items"
);
debug!("Adding item {} into item {}", b, a);
debug!(
"Graph Laplacian has {} nodes, ArrowSpace has {} items",
gl.nnodes, self.nitems
);
let mut item_a = self.get_item(a);
let item_b = self.get_item(b);
item_a.add_inplace(&item_b);
self.set_item(a, item_a);
debug!("Recomputing lambdas after item addition");
self.recompute_lambdas(gl);
}
#[inline]
pub fn mul_items(&mut self, a: usize, b: usize, gl: &GraphLaplacian) {
assert!(
a < self.nitems && b < self.nitems,
"Item indices out of bounds: a={}, b={}, ncols={}",
a,
b,
self.nitems
);
assert_eq!(
gl.nnodes, self.nitems,
"Laplacian nodes must match number of items"
);
debug!("Multiplying item {} by item {}", a, b);
let mut item_a = self.get_item(a);
let item_b = self.get_item(b);
item_a.mul_inplace(&item_b);
self.set_item(a, item_a);
debug!("Recomputing lambdas after item multiplication");
self.recompute_lambdas(gl);
}
#[inline]
pub fn scale_item(&mut self, a: usize, scalar: f64, gl: &GraphLaplacian) {
assert!(
a < self.nitems,
"Item index out of bounds: a={}, ncols={}",
a,
self.nitems
);
assert_eq!(
gl.nnodes, self.nitems,
"Laplacian nodes must match number of items"
);
debug!("Scaling item {} by factor {:.6}", a, scalar);
let mut item_a = self.get_item(a);
item_a.scale(scalar);
self.set_item(a, item_a);
debug!("Recomputing lambdas after item scaling");
self.recompute_lambdas(gl);
}
#[inline]
pub fn recompute_lambdas(&mut self, gl: &GraphLaplacian) {
debug!("Recomputing lambdas with tau mode: {:?}", self.taumode);
TauMode::compute_taumode_lambdas_parallel(self, gl, self.taumode);
let lambda_stats = {
let min = self.lambdas.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max: f64 = self.lambdas.iter().fold(0.0, |a, &b| a.max(b));
let mean = self.lambdas.iter().sum::<f64>() / self.lambdas.len() as f64;
(min, max, mean)
};
debug!(
"Lambda recomputation completed - min: {:.6}, max: {:.6}, mean: {:.6}",
lambda_stats.0, lambda_stats.1, lambda_stats.2
);
}
#[inline]
pub fn search_lambda_aware(
&self,
query: &ArrowItem,
k: usize,
alpha: f64,
) -> Vec<(usize, f64)> {
info!("Lambda-aware search: k={}", k);
debug!(
"Query vector dimension: {}, lambda: {:.6}",
query.len(),
query.lambda
);
assert_ne!(
query.lambda, 0.0,
"Lambda of the item is 0.0, prepare the item before searching"
);
let mut results: Vec<_> = (0..self.nitems)
.map(|i| {
let item = self.get_item(i);
let similarity = query.lambda_similarity(&item, alpha);
(i, similarity)
})
.collect();
results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
results.truncate(k);
debug!("Search completed, returning {} results", results.len());
if !results.is_empty() {
trace!(
"Top result: index={}, score={:.6}",
results[0].0, results[0].1
);
}
results
}
#[inline]
pub fn search_lambda_aware_hybrid(
&self,
query: &ArrowItem,
k: usize,
alpha: f64,
) -> Vec<(usize, f64)> {
info!("Hybrid search: k={}, alpha={}", k, alpha);
if k == 0 {
return Vec::new();
}
let beta = 1.0 - alpha;
let semantic_threshold = 0.9999;
let (lambda_heap, semantic_top, high_semantic_vec) = (0..self.nitems)
.into_par_iter()
.fold(
|| {
(
BinaryHeap::with_capacity(k), (0usize, f64::NEG_INFINITY), Vec::new(), )
},
|(mut heap, mut sem_best, mut high_sem), i| {
let item = self.get_item(i);
let cosine = query.cosine_similarity(&item.item);
let lambda_component = query.lambda_component_similarity(&item);
let lambda_score = alpha * cosine + beta * lambda_component;
if cosine > sem_best.1 {
sem_best = (i, cosine);
}
if cosine > semantic_threshold {
high_sem.push((i, cosine));
}
if heap.len() < k {
heap.push(ScoredItem {
index: i,
score: lambda_score,
});
} else if let Some(&min) = heap.peek() {
if lambda_score > min.score {
heap.pop();
heap.push(ScoredItem {
index: i,
score: lambda_score,
});
}
}
(heap, sem_best, high_sem)
},
)
.reduce(
|| (BinaryHeap::new(), (0, f64::NEG_INFINITY), Vec::new()),
|(mut h1, s1, mut hs1), (h2, s2, hs2)| {
let sem = if s1.1 > s2.1 { s1 } else { s2 };
hs1.extend(hs2);
for item in h2 {
if h1.len() < k {
h1.push(item);
} else if let Some(&min) = h1.peek() {
if item.score > min.score {
h1.pop();
h1.push(item);
}
}
}
(h1, sem, hs1)
},
);
debug!(
"Semantic top: idx={}, score={:.6}, high_semantic_count={}",
semantic_top.0,
semantic_top.1,
high_semantic_vec.len()
);
let mut result_set = HashSet::with_capacity(k + high_semantic_vec.len());
let mut score_map = std::collections::HashMap::with_capacity(result_set.capacity());
for (idx, score) in high_semantic_vec {
result_set.insert(idx);
score_map.insert(idx, score);
}
for item in lambda_heap.into_sorted_vec().into_iter().rev() {
result_set.insert(item.index);
score_map.entry(item.index).or_insert(item.score);
}
result_set.insert(semantic_top.0);
score_map.entry(semantic_top.0).or_insert(semantic_top.1);
let mut final_results: Vec<(usize, f64)> = result_set
.into_iter()
.map(|idx| (idx, *score_map.get(&idx).unwrap()))
.collect();
final_results.sort_unstable_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
final_results.truncate(k);
debug!("Final: {} results", final_results.len());
final_results
}
pub fn search_linear_sorted(
&self,
query: &[f64],
gl: &GraphLaplacian,
k: usize,
) -> Vec<(usize, f64)> {
let q_lambda = self.prepare_query_item(query, gl); self.lambdas_sorted
.range_bylambda(q_lambda, k, gl.graph_params.p)
}
#[inline]
pub fn normalise_lambdas(&mut self) {
self.min_lambdas = self.lambdas.iter().fold(f64::INFINITY, |a, &b| a.min(b));
self.max_lambdas = self.lambdas.iter().fold(0.0_f64, |a, &b| a.max(b));
self.range_lambdas = (self.max_lambdas - self.min_lambdas).max(1e-9);
for lambda in &mut self.lambdas {
*lambda = (*lambda - self.min_lambdas) / self.range_lambdas;
}
info!(
"Normalized lambdas to [0, 1] range (original spread: {:.6})",
self.range_lambdas
);
}
#[inline]
pub fn normalise_query_lambda(&self, raw_lambda: f64) -> f64 {
debug_assert!(
self.range_lambdas > 0.0,
"Call normalise_lambdas() before normalising query lambdas"
);
let normalized = (raw_lambda - self.min_lambdas) / self.range_lambdas;
normalized.clamp(0.0, 1.0)
}
#[inline]
pub fn range_search(
&self,
query: &ArrowItem,
gl: &GraphLaplacian,
eps: f64,
) -> Vec<(usize, f64)> {
info!("Range search with radius: {:.6}", eps);
debug!("Query vector dimension: {}", query.len());
let query: ArrowItem = if relative_eq!(query.lambda, 0.0, epsilon = 1e-9) {
ArrowItem::new(
query.item.as_ref(),
self.prepare_query_item(&query.item, gl),
)
} else {
query.clone()
};
let results: Vec<(usize, f64)> = (0..self.nitems)
.filter_map(|i| {
let item = self.get_item(i);
let distance = query.lambda - item.lambda;
if distance <= eps {
Some((i, distance))
} else {
None
}
})
.collect();
debug!(
"Range search completed, found {} items within radius",
results.len()
);
results
}
pub fn update_lambdas(&mut self, new_lambdas: Vec<f64>) {
assert_eq!(
new_lambdas.len(),
self.lambdas.len(),
"New lambdas length must match existing lambdas length"
);
info!("Updating lambdas with {} new values", new_lambdas.len());
let old_stats = {
let min = self.lambdas.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max: f64 = self.lambdas.iter().fold(0.0, |a, &b| a.max(b));
(min, max)
};
self.lambdas = new_lambdas;
self.normalise_lambdas();
let new_stats = {
let min = self.lambdas.iter().fold(f64::INFINITY, |a, &b| a.min(b));
let max: f64 = self.lambdas.iter().fold(0.0, |a, &b| a.max(b));
(min, max)
};
debug!(
"Lambda update: old range [{:.6}, {:.6}] -> new range [{:.6}, {:.6}]",
old_stats.0, old_stats.1, new_stats.0, new_stats.1
);
}
pub fn arrowspace_config_typed(&self) -> HashMap<String, ConfigValue> {
let mut config = HashMap::new();
config.insert("nitems".to_string(), ConfigValue::Usize(self.nitems));
config.insert("nfeatures".to_string(), ConfigValue::Usize(self.nfeatures));
config.insert(
"min_lambdas".to_string(),
ConfigValue::F64(self.min_lambdas),
);
config.insert(
"max_lambdas".to_string(),
ConfigValue::F64(self.max_lambdas),
);
config.insert(
"range_lambdas".to_string(),
ConfigValue::F64(self.range_lambdas),
);
if self.projection_matrix.is_some() {
config.insert(
"pj_mtx_original_dim".to_string(),
ConfigValue::OptionUsize(Some(
self.projection_matrix.as_ref().unwrap().original_dim,
)),
);
config.insert(
"pj_mtx_reduced_dim".to_string(),
ConfigValue::OptionUsize(Some(
self.projection_matrix.as_ref().unwrap().reduced_dim,
)),
);
config.insert(
"pj_mtx_seed".to_string(),
ConfigValue::OptionU64(Some(self.projection_matrix.as_ref().unwrap().seed)),
);
config.insert(
"extra_reduced_dim".to_string(),
ConfigValue::Bool(self.extra_reduced_dim),
);
} else {
config.insert(
"pj_mtx_original_dim".to_string(),
ConfigValue::OptionUsize(None),
);
config.insert(
"pj_mtx_reduced_dim".to_string(),
ConfigValue::OptionUsize(None),
);
config.insert("pj_mtx_seed".to_string(), ConfigValue::OptionU64(None));
config.insert("extra_reduced_dim".to_string(), ConfigValue::Bool(false));
}
config.insert(
"taumode".to_string(),
ConfigValue::TauMode(self.taumode.clone()),
);
config.insert(
"n_clusters".to_string(),
ConfigValue::Usize(self.n_clusters),
);
config.insert(
"cluster_radius".to_string(),
ConfigValue::F64(self.cluster_radius),
);
config.insert(
"min_lambdas".to_string(),
ConfigValue::F64(self.min_lambdas),
);
config.insert(
"max_lambdas".to_string(),
ConfigValue::F64(self.max_lambdas),
);
config.insert(
"range_lambdas".to_string(),
ConfigValue::F64(self.range_lambdas),
);
config
}
#[cfg(feature = "storage")]
pub fn new_from_storage(
storage_path: impl AsRef<std::path::Path>,
dataset_name: &str,
) -> Result<Self, crate::storage::StorageError> {
use crate::reduction::ImplicitProjection;
use crate::storage::parquet::load_lambda;
let base_path = storage_path.as_ref();
let metadata_path = base_path.join(format!("{}-arrowspace_metadata.json", dataset_name));
info!("Loading storage from {}", metadata_path.display());
let metadata_content = std::fs::read_to_string(&metadata_path).map_err(|e| {
crate::storage::StorageError::Io(format!("Failed to read arrowspace metadata: {}", e))
})?;
let config: HashMap<String, crate::builder::ConfigValue> =
serde_json::from_str(&metadata_content).map_err(|e| {
crate::storage::StorageError::Invalid(format!(
"Failed to parse arrowspace metadata: {}",
e
))
})?;
let raw_path = base_path.join(format!("{}-raw_input.parquet", dataset_name));
let data = crate::storage::parquet::load_dense_matrix(&raw_path)?;
let (nitems, nfeatures) = data.shape();
let lambdas_path = base_path.join(format!("{}-lambdas.parquet", dataset_name));
let lambdas = load_lambda(&lambdas_path)?;
if lambdas.len() != nitems {
return Err(crate::storage::StorageError::Invalid(format!(
"Lambda count ({}) doesn't match items ({})",
lambdas.len(),
nitems
)));
}
let projection_matrix = if let (Some(orig), Some(red), Some(seed)) = (
config
.get("pj_mtx_original_dim")
.and_then(|v| Some(v.as_usize()))
.unwrap_or(None),
config
.get("pj_mtx_reduced_dim")
.and_then(|v| Some(v.as_usize()))
.unwrap_or(None),
config
.get("pj_mtx_seed")
.and_then(|v| Some(v.as_u64()))
.unwrap_or(None),
) {
info!(
"Restoring ImplicitProjection: {} -> {} (seed: {})",
orig, red, seed
);
Some(ImplicitProjection::new(orig, red, Some(seed)))
} else {
None
};
let reduced_dim = config
.get("pj_mtx_reduced_dim")
.and_then(|v| Some(v.as_usize()))
.unwrap_or(None);
let min_lambdas = config
.get("min_lambdas")
.and_then(|v| v.as_f64())
.unwrap_or_else(|| lambdas.iter().fold(f64::INFINITY, |a, &b| a.min(b)));
let max_lambdas = config
.get("max_lambdas")
.and_then(|v| v.as_f64())
.unwrap_or_else(|| lambdas.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b)));
let range_lambdas = config
.get("range_lambdas")
.and_then(|v| v.as_f64())
.unwrap_or(max_lambdas - min_lambdas);
let taumode = config
.get("taumode")
.and_then(|v| v.as_tau_mode())
.unwrap_or(TauMode::Median);
let n_clusters = config
.get("n_clusters")
.and_then(|v| v.as_usize())
.unwrap_or(0);
let cluster_radius = config
.get("cluster_radius")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let extra_reduced_dim = config
.get("extra_reduced_dim")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let mut aspace = Self {
nfeatures,
nitems,
data,
signals: sprs::CsMat::zero((0, 0)),
lambdas,
lambdas_sorted: crate::search::sorted_index::SortedLambdas::new(),
min_lambdas,
max_lambdas,
range_lambdas,
taumode,
n_clusters,
cluster_assignments: vec![],
cluster_sizes: vec![],
cluster_radius,
projection_matrix: projection_matrix.clone(),
reduced_dim,
extra_reduced_dim,
centroid_map: None,
sub_centroids: None,
subcentroid_lambdas: None,
item_norms: None,
};
aspace.build_lambdas_sorted();
info!(
"Loaded ArrowSpace: {} items × {} features, projection: {}, tau: {:?}",
nitems,
nfeatures,
projection_matrix.is_some(),
taumode
);
Ok(aspace)
}
}
impl AsRef<DenseMatrix<f64>> for ArrowSpace {
#[inline]
fn as_ref(&self) -> &DenseMatrix<f64> {
&self.data
}
}
impl AsMut<DenseMatrix<f64>> for ArrowSpace {
#[inline]
fn as_mut(&mut self) -> &mut DenseMatrix<f64> {
&mut self.data
}
}
impl<'a> IntoIterator for &'a ArrowItem {
type Item = &'a f64;
type IntoIter = std::slice::Iter<'a, f64>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.item.iter()
}
}
impl<'a> IntoIterator for &'a mut ArrowItem {
type Item = &'a mut f64;
type IntoIter = std::slice::IterMut<'a, f64>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.item.iter_mut()
}
}
pub fn densematrix_to_vecvec(matrix: &DenseMatrix<f64>) -> Vec<Vec<f64>> {
let (rows, cols) = matrix.shape();
(0..rows)
.map(|r| (0..cols).map(|c| matrix.get((r, c)).clone()).collect())
.collect()
}