use anyhow::anyhow;
use num_traits::Float;
use num_traits::cast::FromPrimitive;
use std::io::Write;
use indexmap::set::*;
use std::cmp::Ordering;
use quantiles::ckms::CKMS;
use rayon::prelude::*;
use hnsw_rs::prelude::*;
use crate::tools::{dimension::*, nodeparam::*};
use crate::tools::reservoir::unweighted_reservoir;
use permutation::Permutation;
use rand::SeedableRng;
use rand::distr::Distribution;
use rand_xoshiro::Xoshiro256PlusPlus;
struct RangeNghb<F: Float>(F, F);
pub struct KGraphStat<F: Float> {
ranges: Vec<RangeNghb<F>>,
in_degrees: Vec<u32>,
mean_in_degree: usize,
max_in_degree: usize,
min_radius_q: CKMS<f32>,
}
impl<F: Float> KGraphStat<F> {
pub fn get_density_index(&self) -> Vec<F> {
self.ranges.iter().map(|x| x.1.recip()).collect()
}
pub fn get_max_in_degree(&self) -> usize {
self.max_in_degree
}
pub fn get_mean_in_degree(&self) -> usize {
self.mean_in_degree
}
pub fn get_in_degrees(&self) -> &Vec<u32> {
&self.in_degrees
}
pub fn get_radius_at_quantile(&self, frac: f64) -> f32 {
if (0. ..=1.).contains(&frac) {
self.min_radius_q.query(frac).unwrap().1
} else {
0.
}
}
}
#[derive(Clone)]
pub struct KGraph<F> {
pub(crate) max_nbng: usize,
pub(crate) nbnodes: usize,
pub(crate) neighbours: Vec<Vec<OutEdge<F>>>,
pub(crate) node_set: IndexSet<DataId>,
}
impl<F> Default for KGraph<F>
where
F: FromPrimitive + Float + std::fmt::UpperExp + Sync + Send + std::iter::Sum,
{
fn default() -> Self {
Self::new()
}
}
impl<F> KGraph<F>
where
F: FromPrimitive + Float + std::fmt::UpperExp + Sync + Send + std::iter::Sum,
{
pub fn new() -> Self {
let neighbours_init = Vec::<Vec<OutEdge<F>>>::new();
KGraph {
max_nbng: 0,
nbnodes: 0,
neighbours: neighbours_init,
node_set: IndexSet::new(),
}
}
pub fn get_nb_nodes(&self) -> usize {
self.nbnodes
}
pub fn get_max_nbng(&self) -> usize {
self.max_nbng
}
pub fn get_neighbours(&self) -> &Vec<Vec<OutEdge<F>>> {
&self.neighbours
}
pub fn get_out_edges_by_idx(&self, node: NodeIdx) -> &Vec<OutEdge<F>> {
&self.neighbours[node]
}
pub fn compute_max_edge(&self) -> Vec<(usize, f64)> {
let neighbours = &self.neighbours;
let mut max_edge_length: Vec<(usize, f64)> = (0..neighbours.len())
.into_par_iter()
.map(|n| -> (usize, f64) {
let mut node_edge_length: f64 = 0.;
for edge in &neighbours[n] {
node_edge_length = node_edge_length.max(edge.weight.to_f64().unwrap());
}
(n, node_edge_length)
})
.collect();
max_edge_length.sort_unstable_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
max_edge_length
}
pub fn get_out_edges_by_data_id(
&self,
data_id: &DataId,
) -> Result<&Vec<OutEdge<F>>, anyhow::Error> {
let idx = self.get_idx_from_dataid(data_id);
if idx.is_none() {
return Err(anyhow!("bad data_id"));
}
let idx = idx.unwrap();
Ok(self.get_out_edges_by_idx(idx))
}
pub fn intrinsic_dim_at_data_id(&self, data_id: &DataId) -> Result<f64, anyhow::Error> {
let edges_res = self.get_out_edges_by_data_id(data_id);
if edges_res.is_err() {
return Err(edges_res.err().unwrap());
}
let edges: &Vec<OutEdge<F>> = edges_res.unwrap();
intrinsic_dimension_from_edges::<F>(edges)
}
pub fn estimate_intrinsic_dim(
&self,
sampling_size: usize,
) -> Result<(f64, f64), anyhow::Error> {
let mut dims = Vec::<f64>::with_capacity(sampling_size);
let nb_nodes = self.get_nb_nodes();
let mut rng = rand::rng();
let between = rand_distr::Uniform::new(0, nb_nodes).unwrap();
for _ in 0..sampling_size {
let node = between.sample(&mut rng);
let edges = &self.neighbours[node];
let dim_res = intrinsic_dimension_from_edges(edges);
if let Ok(dim) = dim_res {
dims.push(dim);
}
}
if dims.is_empty() {
log::error!("could not sample dimension");
return Err(anyhow!("could not sample points"));
}
let mean_dim: f64 = dims.iter().sum::<f64>() / dims.len() as f64;
let mut sigma = dims
.iter()
.fold(0., |acc, d| acc + (d - mean_dim) * (d - mean_dim));
sigma = (sigma / dims.len() as f64).sqrt();
log::debug!(
" mean dimension : {:.3e}, sigma : {:.3e}, nb_points used: {}",
mean_dim,
sigma,
dims.len()
);
Ok((mean_dim, sigma))
}
pub fn estimate_intrinsic_dim_2nn(
&self,
sampling_size_arg: usize,
) -> Result<f64, anyhow::Error> {
let neighbours = self.get_neighbours();
let size = neighbours.len();
let mut rng: Xoshiro256PlusPlus = Xoshiro256PlusPlus::seed_from_u64(4664397);
let sampling_size = size.min(sampling_size_arg);
let sampled = unweighted_reservoir(sampling_size, 0..size, &mut rng);
let mut ratios = Vec::<f64>::with_capacity(sampling_size);
sampled.into_iter().for_each(|n| {
let r1 = neighbours[n][0].weight.to_f64().unwrap();
let r2 = neighbours[n][1].weight.to_f64().unwrap();
assert!(r1 <= r2 && r1 > 0.);
ratios.push(r2 / r1);
});
let mut permutation = Permutation::one(sampling_size);
permutation.assign_from_sort_by(&ratios, |a, b| a.partial_cmp(&b).unwrap());
let direct_permutation = permutation.normalize(false); let mut cumulant: Vec<f64> = vec![0.; sampling_size];
for i in 0..sampling_size {
let rank = direct_permutation.apply_idx(i);
cumulant[rank] = rank as f64 / sampling_size as f64;
if i <= 20 {
log::debug!(
"i: {}, {:.3e}, rank : {}, cumul : {:.3e}",
i,
ratios[i],
rank,
cumulant[rank]
);
}
}
let mut num: f64 = 0.0;
let mut den: f64 = 0.0;
for i in 0..sampling_size {
let ratio = ratios[i];
den += ratio.ln() * ratio.ln();
let ipermut = direct_permutation.apply_idx(i);
num += -ratio.ln() * (1. - cumulant[ipermut]).ln();
}
log::debug!("num : {:.3e}, den = {:.3e}", num, den);
Ok(num / den)
}
pub fn get_data_id_from_idx(&self, index: usize) -> Option<&DataId> {
self.node_set.get_index(index)
}
pub fn get_idx_from_dataid(&self, data_id: &DataId) -> Option<usize> {
self.node_set.get_index_of(data_id)
}
#[allow(unused)]
pub(crate) fn get_indexset(&self) -> &IndexSet<DataId> {
&self.node_set
}
pub(crate) fn to_ripser_sparse_dist(
&self,
writer: &mut dyn Write,
) -> Result<(), anyhow::Error> {
log::debug!("in to_ripser_sparse_dist");
for i in 0..self.nbnodes {
for n in &self.neighbours[i] {
writeln!(writer, "{} {} {:.5E}", i, n.node, n.weight)?;
writeln!(writer, "{} {} {:.5E}", n.node, i, n.weight)?;
}
}
log::debug!("to_ripser_sparse_dist finished");
Ok(())
}
pub fn get_kraph_stats(&self) -> KGraphStat<F> {
let mut in_degrees: Vec<u32> = (0..self.nbnodes).map(|_| 0).collect();
let mut ranges = Vec::<RangeNghb<F>>::with_capacity(self.nbnodes);
let mut max_max_r = F::zero();
let mut min_min_r = F::max_value();
let mut quant = CKMS::<f32>::new(0.001);
for i in 0..self.neighbours.len() {
if !self.neighbours[i].is_empty() {
let min_r = self.neighbours[i][0].weight;
let max_r = self.neighbours[i][self.neighbours[i].len() - 1].weight;
quant.insert(F::to_f32(&min_r).unwrap());
max_max_r = max_max_r.max(max_r);
min_min_r = min_min_r.min(min_r);
ranges.push(RangeNghb(min_r, max_r));
for j in 0..self.neighbours[i].len() {
in_degrees[self.neighbours[i][j].node] += 1;
}
}
}
let mut max_in_degree = 0;
let mut mean_in_degree: f32 = 0.;
for in_d in &in_degrees {
max_in_degree = max_in_degree.max(*in_d);
mean_in_degree += *in_d as f32;
}
if !in_degrees.is_empty() {
mean_in_degree /= in_degrees.len() as f32;
}
log::info!("\n minimal graph statistics \n");
log::info!("\t max in degree : {:.2e}", max_in_degree);
log::info!("\t mean in degree : {:.2e}", mean_in_degree);
log::info!("\t max max range : {:.2e} ", max_max_r.to_f32().unwrap());
log::info!("\t min min range : {:.2e} ", min_min_r.to_f32().unwrap());
if quant.count() > 0 {
log::info!(
"min radius quantile at 0.05 : {:.2e} , 0.5 : {:.2e}, 0.95 : {:.2e}, 0.99 : {:.2e}",
quant.query(0.05).unwrap().1,
quant.query(0.5).unwrap().1,
quant.query(0.95).unwrap().1,
quant.query(0.99).unwrap().1
);
}
KGraphStat {
ranges,
in_degrees,
mean_in_degree: mean_in_degree.round() as usize,
max_in_degree: max_in_degree as usize,
min_radius_q: quant,
}
} }
pub fn kgraph_from_hnsw_all<T, D, F>(hnsw: &Hnsw<T, D>, nbng: usize) -> anyhow::Result<KGraph<F>>
where
T: Clone + Send + Sync,
D: Distance<T> + Send + Sync,
F: Float + FromPrimitive,
{
log::debug!("entering kgraph_from_hnsw_all");
let mut nb_point_below_nbng = 0;
let mut mean_deficient_neighbour_size: usize = 0;
let mut minimum_nbng = nbng;
let mut mean_nbng = 0u64;
let max_nb_conn = hnsw.get_max_nb_connection() as usize; if max_nb_conn < nbng {
log::info!(
"init_from_hnsw_all: number of neighbours asked {} must be less than hnsw max_nb_connection : {} ",
nbng,
max_nb_conn
);
log::warn!(
"init_from_hnsw_all: number of neighbours asked {} must be less than hnsw max_nb_connection : {} ",
nbng,
max_nb_conn
);
} else {
log::info!(
"kgraph_from_hnsw_all construction with {} nb_neighbours",
nbng
);
}
let point_indexation = hnsw.get_point_indexation();
let nb_point = point_indexation.get_nb_point();
let mut node_set = IndexSet::<DataId>::with_capacity(nb_point);
let mut neighbours = Vec::<Vec<OutEdge<F>>>::with_capacity(nb_point);
for _i in 0..nb_point {
neighbours.push(Vec::<OutEdge<F>>::new());
}
let point_indexation = hnsw.get_point_indexation();
let point_iter = point_indexation.into_iter();
for point in point_iter {
let point_id = point.get_origin_id();
let (index, _) = node_set.insert_full(point_id);
let neighbours_hnsw = point.get_neighborhood_id();
let nb_layer = neighbours_hnsw.len();
let mut vec_tmp = Vec::<OutEdge<F>>::with_capacity(max_nb_conn * nb_layer);
for neighbours_layer in neighbours_hnsw {
for neighbour in &neighbours_layer {
let (neighbour_idx, _) = node_set.insert_full(neighbour.get_origin_id());
assert!(index != neighbour_idx);
vec_tmp.push(OutEdge::<F> {
node: neighbour_idx,
weight: F::from_f32(neighbour.distance).unwrap(),
});
}
}
vec_tmp.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Less));
assert!(vec_tmp.len() <= 1 || vec_tmp[0].weight <= vec_tmp[1].weight); if vec_tmp.len() < nbng {
nb_point_below_nbng += 1;
mean_deficient_neighbour_size += vec_tmp.len();
log::trace!(
"neighbours must have {} neighbours, point {} got only {}",
nbng,
point_id,
vec_tmp.len()
);
if vec_tmp.is_empty() {
let p_id = point.get_point_id();
log::error!(
"kgraph_from_hnsw_all: graph will not be connected, isolated point at layer {} , pos in layer : {} \n",
p_id.0,
p_id.1
);
log::error!(
"kgraph_from_hnsw_all: graph will not be connected, isolated point at layer {} , pos in layer : {} ",
p_id.0,
p_id.1
);
return Err(anyhow!(
"kgraph_from_hnsw_all: graph will not be connected, isolated point at layer {} , pos in layer : {} ",
p_id.0,
p_id.1
));
}
}
vec_tmp.truncate(nbng);
mean_nbng += vec_tmp.len() as u64;
minimum_nbng = minimum_nbng.min(vec_tmp.len());
neighbours[index] = vec_tmp;
}
let nbnodes = neighbours.len();
assert_eq!(neighbours.len(), nb_point);
log::trace!("KGraph::exiting init_from_hnsw_all");
log::info!(
"mean number of neighbours obtained = {:.3e}, minimal number of neighbours {}",
mean_nbng as f64 / nb_point as f64,
minimum_nbng
);
if nb_point_below_nbng > 0 {
log::info!(
"number of points with less than : {} neighbours = {}, mean size for deficient neighbourhhod {:.3e}",
nbng,
nb_point_below_nbng,
mean_deficient_neighbour_size as f64 / nb_point_below_nbng as f64
);
}
let mean_nbng = mean_nbng as f64 / nb_point as f64;
if mean_nbng < nbng as f64 {
log::warn!(" mean number of neighbours obtained : {:.3e}", mean_nbng);
log::warn!(" possibly use hnsw.set_keeping_pruned(true)");
log::warn!(" mean number of neighbours obtained : {:.3e}", mean_nbng);
log::warn!(" possibly use hnsw.set_keeping_pruned(true)");
}
Ok(KGraph {
max_nbng: nbng,
nbnodes,
neighbours,
node_set,
})
}
pub fn kgraph_from_hnsw_layer<T, D, F>(
hnsw: &Hnsw<T, D>,
nbng: usize,
layer: usize,
) -> std::result::Result<KGraph<F>, usize>
where
T: Clone + Send + Sync,
D: Distance<T> + Send + Sync,
F: Float + FromPrimitive,
{
log::trace!("init_from_hnsw_layer");
let max_nbng = nbng;
let mut nb_point_below_nbng: usize = 0;
let mut mean_deficient_neighbour_size: usize = 0;
let mut minimum_nbng = nbng;
let mut mean_nbng = 0u64;
let max_nb_conn = hnsw.get_max_nb_connection() as usize;
let max_level_observed = hnsw.get_max_level_observed() as usize;
let mut nb_point = 0;
for l in (layer..=max_level_observed).rev() {
nb_point += hnsw.get_point_indexation().get_layer_nb_point(l);
}
log::trace!(
"init_from_hnsw_layer down to layer {} collecting nbpoint : {}",
layer,
nb_point
);
let mut node_set = IndexSet::<DataId>::with_capacity(nb_point);
let mut neighbours = Vec::<Vec<OutEdge<F>>>::with_capacity(nb_point);
for _i in 0..nb_point {
neighbours.push(Vec::<OutEdge<F>>::new());
}
let mut nb_point_collected = 0;
for l in (layer..=max_level_observed).rev() {
let layer_iter = hnsw.get_point_indexation().get_layer_iterator(l);
for point in layer_iter {
let origin_id = point.get_origin_id();
let p_id = point.get_point_id();
let (index, _) = node_set.insert_full(origin_id);
if index >= nb_point {
log::trace!(
"init_from_hnsw_layer point_id {} index {}",
origin_id,
index
);
assert!(index < nb_point);
}
let neighbours_hnsw = point.get_neighborhood_id();
let mut vec_tmp = Vec::<OutEdge<F>>::with_capacity(max_nb_conn);
for neighbours_layer in neighbours_hnsw
.iter()
.take(max_level_observed + 1)
.skip(layer)
{
for neighbour in neighbours_layer {
let n_origin_id = neighbour.get_origin_id();
let n_p_id = neighbour.p_id;
if n_p_id.0 as usize >= l {
let (neighbour_idx, _) = node_set.insert_full(n_origin_id);
vec_tmp.push(OutEdge::<F> {
node: neighbour_idx,
weight: F::from_f32(neighbour.distance).unwrap(),
});
}
} } vec_tmp.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Less));
assert!(vec_tmp.len() <= 1 || vec_tmp[0].weight <= vec_tmp[1].weight); if vec_tmp.len() < nbng {
nb_point_below_nbng += 1;
mean_deficient_neighbour_size += vec_tmp.len();
log::trace!(
"neighbours must have {} neighbours, got only {}. layer {} , pos in layer : {}",
nbng,
vec_tmp.len(),
p_id.0,
p_id.1
);
if vec_tmp.is_empty() {
let p_id = point.get_point_id();
log::warn!(
" graph will not be connected, isolated point at layer {} , pos in layer : {} ",
p_id.0,
p_id.1
);
node_set.swap_remove(&index);
continue;
}
}
vec_tmp.truncate(nbng);
nb_point_collected += 1;
mean_nbng += vec_tmp.len() as u64;
minimum_nbng = minimum_nbng.min(vec_tmp.len());
neighbours[index] = vec_tmp;
} }
let nbnodes = neighbours.len();
assert_eq!(nbnodes, nb_point);
log::trace!("KGraph::exiting init_from_hnsw_layer");
log::trace!("collected {} points", nb_point_collected);
let mean_nbng = mean_nbng as f64 / nb_point_collected as f64;
log::info!(
"mean number of neighbours obtained = {:.3e} minimal number of neighbours {}",
mean_nbng,
minimum_nbng
);
if nb_point_below_nbng > 0 {
log::info!(
"number of points with less than : {} neighbours = {}, mean size for deficient neighbourhhod {:.3e}",
nbng,
nb_point_below_nbng,
mean_deficient_neighbour_size as f64 / nb_point_below_nbng as f64
);
}
if mean_nbng < nbng as f64 {
log::warn!(" mean number of neighbours obtained : {:.3e}", mean_nbng);
log::warn!(" possibly use hnsw.reset_keeping_pruned(true)");
}
Ok(KGraph {
max_nbng,
nbnodes,
neighbours,
node_set,
})
}
#[cfg(test)]
#[allow(clippy::range_zip_with_len)]
mod tests {
use super::*;
use std::fs::OpenOptions;
use std::io::BufWriter;
use std::path::Path;
use rand::distr::Uniform;
use rand::prelude::*;
use crate::fromhnsw::hubness;
#[cfg(test)]
fn log_init_test() {
let res = env_logger::builder().is_test(true).try_init();
if res.is_err() {
println!("could not init log");
}
}
fn gen_rand_data_f32(nb_elem: usize, dim: usize) -> Vec<Vec<f32>> {
let mut data = Vec::<Vec<f32>>::with_capacity(nb_elem);
let mut rng = rand::rng();
let unif = Uniform::<f32>::new(0., 1.).unwrap();
for i in 0..nb_elem {
let val = 10. * i as f32 * rng.sample(unif);
let v: Vec<f32> = (0..dim).map(|_| val * rng.sample(unif)).collect();
data.push(v);
}
data
}
#[test]
fn test_full_hnsw() {
log_init_test();
let nb_elem = 20000;
let dim = 30;
let knbn = 20;
log::debug!("test_full_hnsw");
log::debug!("\n\n test_serial nb_elem {:?}", nb_elem);
let data = gen_rand_data_f32(nb_elem, dim);
let data_with_id: Vec<(&Vec<f32>, usize)> = data
.iter()
.zip(0..data.len())
.collect::<Vec<(&Vec<f32>, usize)>>();
let ef_c = 50;
let max_nb_connection = 50;
let nb_layer = 16.min((nb_elem as f32).ln().trunc() as usize);
let mut hns =
Hnsw::<f32, DistL1>::new(max_nb_connection, nb_elem, nb_layer, ef_c, DistL1 {});
hns.set_keeping_pruned(true);
hns.parallel_insert(&data_with_id);
hns.dump_layer_info();
log::info!("calling kgraph.init_from_hnsw_all");
let kgraph: KGraph<f32> = kgraph_from_hnsw_all(&hns, knbn).unwrap();
log::info!("minimum number of neighbours {}", kgraph.get_max_nbng());
let _kgraph_stats = kgraph.get_kraph_stats();
let id = 10;
let dimension = kgraph.intrinsic_dim_at_data_id(&id).unwrap();
log::debug!("dimension around point : {}, dim = {:.3e}", id, dimension);
log::info!(
"\n dimension around point : {}, dim = {:.3e}",
id,
dimension
);
let dimension = kgraph.estimate_intrinsic_dim(10000);
assert!(dimension.is_ok());
let dimension = dimension.unwrap();
log::info!(
"\n estimation of dimension : {:.3e}, sigma : {:.3e} ",
dimension.0,
dimension.1
);
log::debug!(
"\n estimation of dimension : {:.3e}, sigma : {:.3e} ",
dimension.0,
dimension.1
);
let hubness = self::hubness::Hubness::new(&kgraph);
let s3 = hubness.get_standard3m();
log::info!(" estimation of hubness : {:.3e}", s3);
}
#[test]
fn test_layer_hnsw() {
log_init_test();
let nb_elem = 80000;
let dim = 30;
let knbn = 20;
log::debug!("\n\n test_serial nb_elem {:?}", nb_elem);
let data = gen_rand_data_f32(nb_elem, dim);
let data_with_id: Vec<(&Vec<f32>, usize)> = data.iter().zip(0..data.len()).collect();
let ef_c = 50;
let layer = 1;
let max_nb_connection = 64;
let nb_layer = 16.min((nb_elem as f32).ln().trunc() as usize);
let mut hns =
Hnsw::<f32, DistL1>::new(max_nb_connection, nb_elem, nb_layer, ef_c, DistL1 {});
hns.set_keeping_pruned(true);
hns.parallel_insert(&data_with_id);
hns.dump_layer_info();
log::info!("calling kgraph.init_from_hnsw_layer");
let kgraph: KGraph<f32> = kgraph_from_hnsw_layer(&hns, knbn, layer).unwrap();
log::info!("minimum number of neighbours {}", kgraph.get_max_nbng());
let _kgraph_stats = kgraph.get_kraph_stats();
let fname = "test_ripser_output";
log::info!("testing ripser output in file : {}", fname);
let path = Path::new(fname);
log::debug!("in to_ripser_sparse_dist : fname : {}", path.display());
let fileres = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path);
let file;
if fileres.is_ok() {
file = fileres.unwrap();
let mut bufwriter = BufWriter::new(file);
let res = kgraph.to_ripser_sparse_dist(&mut bufwriter);
if res.is_err() {
log::error!("kgraph.to_ripser_sparse_dist in {} failed", fname);
}
} else {
log::error!("cannot open {}", path.display());
assert_eq!(1, 0);
}
}
#[test]
fn test_small_indexset() {
let _ = env_logger::builder().is_test(true).try_init();
let size = 30;
let mut idxset = IndexSet::<usize>::with_capacity(size);
let from = 10000;
let between = Uniform::new(from, from + size).unwrap();
let mut rng = rand::rng();
for _i in 0..100000 {
let xsi = between.sample(&mut rng);
let (idx, _) = idxset.insert_full(xsi);
assert!(idx < size);
}
} }