use num_traits::{Float, NumAssign};
use ndarray::{Array1, Array2, ArrayView1};
use ndarray_linalg::{Lapack, Scalar};
use quantiles::ckms::CKMS; use csv::Writer;
use crate::prelude::write_csv_labeled_array2;
use rayon::prelude::*;
use parking_lot::RwLock;
use std::sync::Arc;
use rand::{Rng, thread_rng};
use rand::distributions::Uniform;
use rand_distr::WeightedAliasIndex;
use rand_distr::{Normal, Distribution};
use indexmap::set::*;
use std::time::{Duration,SystemTime};
use cpu_time::ProcessTime;
use hnsw_rs::prelude::*;
use crate::fromhnsw::{kgraph::KGraph, kgraph::kgraph_from_hnsw_all , kgproj::*};
use crate::embedparams::*;
use crate::diffmaps::*;
use crate::tools::{dichotomy::*,nodeparam::*};
const PROBA_MIN: f32 = 1.0E-5;
#[inline]
fn distl2<F:Float+ Lapack + Scalar + ndarray::ScalarOperand + Send + Sync>(a: &[F], b: &[F]) -> F {
assert_eq!(a.len(), b.len());
let norm : F = a.iter().zip(b.iter()).map(|t| (*t.0 - *t.1 ) * (*t.0 - *t.1)).sum();
num_traits::Float::sqrt(norm)
}
struct DistL2F;
impl <F> Distance<F> for DistL2F
where F:Float+ Lapack + Scalar + ndarray::ScalarOperand + Send + Sync {
fn eval(&self, va:&[F], vb: &[F]) -> f32 {
distl2::<F>(va, vb).to_f32().unwrap()
} }
pub struct Embedder<'a,F> {
kgraph: Option<&'a KGraph<F>>,
hkgraph: Option<&'a KGraphProjection<F>>,
parameters : EmbedderParams,
initial_space: Option<NodeParams>,
initial_embedding : Option<Array2<F>>,
embedding: Option<Array2<F>>,
}
impl<'a,F> Embedder<'a,F>
where
F: Float + Lapack + Scalar + ndarray::ScalarOperand + Send + Sync,
{
pub fn new(kgraph : &'a KGraph<F>, parameters : EmbedderParams) -> Self {
Embedder::<F>{kgraph : Some(kgraph), hkgraph : None, parameters , initial_space:None,
initial_embedding : None, embedding:None}
}
pub fn from_hkgraph(graph_projection : &'a KGraphProjection<F>, parameters : EmbedderParams) -> Self {
Embedder::<F>{kgraph : None, hkgraph : Some(graph_projection), parameters , initial_space:None,
initial_embedding : None, embedding:None}
}
pub fn get_asked_dimension(&self) -> usize {
self.parameters.asked_dim
}
pub fn get_scale_rho(&self) -> f64 {
self.parameters.scale_rho
}
pub fn get_b(&self) -> f64 {
self.parameters.b
}
pub fn get_grad_step(&self) -> f64 {
self.parameters.grad_step
}
pub fn get_nb_grad_batch(&self) -> usize {
self.parameters.nb_grad_batch
}
pub fn embed(&mut self) -> Result<usize, usize> {
if self.kgraph.is_some() {
log::info!("doing one step embedding");
return self.one_step_embed();
}
else {
log::info!("doing 2 step embedding");
return self.h_embed();
}
}
pub fn h_embed(&mut self) -> Result<usize, usize> {
if self.hkgraph.is_none() {
log::error!("Embedder::h_embed , graph projection is none");
return Err(1);
}
log::debug!("in h_embed");
let graph_projection = self.hkgraph.as_ref().unwrap();
log::info!(" embedding first (small) graph");
let mut first_step_parameters = self.parameters;
first_step_parameters.nb_grad_batch = self.parameters.grad_factor * self.parameters.nb_grad_batch;
log::info!("nb initial batch : {}", first_step_parameters.nb_grad_batch);
first_step_parameters.grad_step = 1.;
let mut embedder_first_step = Embedder::new(graph_projection.get_small_graph(), first_step_parameters);
let cpu_start = ProcessTime::now();
let sys_start = SystemTime::now();
let res_first = embedder_first_step.one_step_embed();
if res_first.is_err() {
log::error!("Embedder::h_embed first step failed");
return res_first;
}
println!(" first step embedding sys time(ms) {:.2e} cpu time(ms) {:.2e}", sys_start.elapsed().unwrap().as_millis(), cpu_start.elapsed().as_millis());
let large_graph = graph_projection.get_large_graph();
log::info!("computing proba edges for large graph ...");
self.initial_space = Some(to_proba_edges(large_graph, self.parameters.scale_rho as f32, self.parameters.beta as f32));
let nb_nodes_large = large_graph.get_nb_nodes();
let first_embedding = embedder_first_step.get_embedded().unwrap();
let quant = graph_projection.get_projection_distance_quant();
if quant.count() > 0 {
println!(" projection distance quantile at 0.05 : {:.2e} , 0.5 : {:.2e}, 0.95 : {:.2e}, 0.99 : {:.2e}",
quant.query(0.05).unwrap().1, quant.query(0.5).unwrap().1,
quant.query(0.95).unwrap().1, quant.query(0.99).unwrap().1);
};
let dim = self.parameters.get_dimension();
let mut second_step_init = Array2::<F>::zeros((nb_nodes_large, dim));
log::info!("doing projection");
let (nb_nodes_small, _) = first_embedding.dim();
let mut rng = thread_rng();
for i in 0..nb_nodes_small {
for j in 0..dim {
second_step_init[[i,j]] = first_embedding[[i,j]];
}
}
let median_dist = quant.query(0.5).unwrap().1;
let normal = Normal::<f32>::new(0., 1.0).unwrap();
for i in nb_nodes_small..nb_nodes_large {
let projected_edge = graph_projection.get_projection_by_nodeidx(&i);
let ratio = projected_edge.weight.to_f32().unwrap() / median_dist;
let correction = (ratio/dim as f32).sqrt();
for j in 0..dim {
let clipped_correction = clip(correction * normal.sample(&mut rng), 2.);
second_step_init[[i,j]] = first_embedding[[projected_edge.node,j]] + F::from(clipped_correction).unwrap();
}
}
log::debug!("projection done");
self.initial_embedding = Some(second_step_init);
log::info!("optimizing second step");
let embedding_res = self.entropy_optimize(&self.parameters, self.initial_embedding.as_ref().unwrap());
println!(" first + second step embedding sys time(s) {:.2e} cpu time(s) {:.2e}", sys_start.elapsed().unwrap().as_secs(), cpu_start.elapsed().as_secs());
match embedding_res {
Ok(embedding) => {
self.embedding = Some(embedding);
return Ok(1);
}
_ => {
log::error!("Embedder::embed : embedding optimization failed");
return Err(1);
}
}
}
pub fn one_step_embed(&mut self) -> Result<usize, usize> {
log::info!("doing 1 step embedding");
self.parameters.log();
let graph_to_embed = self.kgraph.unwrap();
self.initial_space = Some(to_proba_edges(graph_to_embed, self.parameters.scale_rho as f32, self.parameters.beta as f32));
let mut initial_embedding;
if self.parameters.dmap_init {
let cpu_start = ProcessTime::now();
let sys_start = SystemTime::now();
initial_embedding = get_dmap_embedding(self.initial_space.as_ref().unwrap(), self.parameters.get_dimension(), None);
println!(" dmap initialization sys time(ms) {:.2e} cpu time(ms) {:.2e}", sys_start.elapsed().unwrap().as_millis(), cpu_start.elapsed().as_millis());
set_data_box(&mut initial_embedding, 1.);
}
else {
initial_embedding = self.get_random_init(1.);
}
let embedding_res = self.entropy_optimize(&self.parameters, &initial_embedding);
self.initial_embedding = Some(initial_embedding);
match embedding_res {
Ok(embedding) => {
self.embedding = Some(embedding);
return Ok(1);
}
_ => {
log::error!("Embedder::embed : embedding optimization failed");
return Err(1);
}
}
}
pub fn get_embedded(&self) -> Option<&Array2<F>> {
return self.embedding.as_ref();
}
pub fn get_embedded_reindexed(&self) -> Array2<F> {
let emmbedded = self.embedding.as_ref().unwrap();
let (nbrow, dim) = emmbedded.dim();
let mut reindexed = Array2::<F>::zeros((nbrow, dim));
let kgraph = if self.hkgraph.is_some()
{ self.hkgraph.as_ref().unwrap().get_large_graph() }
else {self.kgraph.as_ref().unwrap() };
for i in 0..nbrow {
let row = emmbedded.row(i);
let origin_id = kgraph.get_data_id_from_idx(i).unwrap();
for j in 0..dim {
reindexed[[*origin_id,j]] = row[j];
}
}
return reindexed;
}
pub fn get_embedded_by_dataid(&self, data_id : &DataId) -> ArrayView1<F> {
let kgraph = if self.hkgraph.is_some()
{ self.hkgraph.as_ref().unwrap().get_large_graph() }
else {self.kgraph.as_ref().unwrap() };
let data_idx = kgraph.get_idx_from_dataid(data_id).unwrap();
self.embedding.as_ref().unwrap().row(data_idx)
}
pub fn get_embedded_by_nodeid(&self, node : NodeIdx) -> ArrayView1<F> {
self.embedding.as_ref().unwrap().row(node)
}
pub fn get_initial_embedding(&self) -> Option<&Array2<F>> {
return self.initial_embedding.as_ref();
}
pub fn get_initial_embedding_reindexed(&self) -> Array2<F> {
let emmbedded = self.initial_embedding.as_ref().unwrap();
let (nbrow, dim) = emmbedded.dim();
let mut reindexed = Array2::<F>::zeros((nbrow, dim));
let kgraph = if self.hkgraph.is_some()
{ self.hkgraph.as_ref().unwrap().get_large_graph() }
else {self.kgraph.as_ref().unwrap() };
for i in 0..nbrow {
let row = emmbedded.row(i);
let origin_id = kgraph.get_data_id_from_idx(i).unwrap();
for j in 0..dim {
reindexed[[*origin_id,j]] = row[j];
}
}
return reindexed;
}
fn get_random_init(&mut self, size:f32) -> Array2<F> {
log::trace!("Embedder get_random_init with size {:.2e}", size);
let nb_nodes = self.initial_space.as_ref().unwrap().get_nb_nodes();
let mut initial_embedding = Array2::<F>::zeros((nb_nodes, self.get_asked_dimension()));
let unif = Uniform::<f32>::new(-size/2. , size/2.);
let mut rng = thread_rng();
for i in 0..nb_nodes {
for j in 0..self.get_asked_dimension() {
initial_embedding[[i,j]] = F::from(rng.sample(unif)).unwrap();
}
}
initial_embedding
}
fn get_transformed_kgraph(&self) -> Option<Vec<(usize,Vec<OutEdge<F>>) >> {
let kgraph : &KGraph<F>;
if self.kgraph.is_some() {
kgraph = self.kgraph.unwrap();
log::info!("found kgraph");
}
else if self.hkgraph.is_some() {
kgraph = self.hkgraph.as_ref().unwrap().get_large_graph();
log::info!("found kgraph from projection");
}
else {
log::info!("could not find kgraph");
std::process::exit(1);
}
let neighbours = kgraph.get_neighbours();
let transformed_neighbours : Vec::<(usize,Vec<OutEdge<F>>)> = (0..neighbours.len()).into_par_iter().map( |n| -> (usize, Vec<OutEdge<F>>)
{
let node_embedded = self.get_embedded_by_nodeid(n);
let mut transformed_neighborhood = Vec::<OutEdge<F>>::with_capacity(neighbours[n].len());
let mut node_edge_length = F::max_value();
for edge in &neighbours[n] {
let ext_embedded = self.get_embedded_by_nodeid(edge.node);
node_edge_length = distl2(node_embedded.as_slice().unwrap(), &ext_embedded.as_slice().unwrap()).min(node_edge_length);
transformed_neighborhood.push(OutEdge::<F>::new(edge.node, node_edge_length));
}
transformed_neighborhood.sort_unstable_by(|a,b| a.weight.partial_cmp(&b.weight).unwrap());
return (n, transformed_neighborhood);
}
).collect();
let mut sorted_neighbours_info = Vec::<(usize, Vec<OutEdge<F>>)>::with_capacity(neighbours.len());
for i in 0..neighbours.len() {
sorted_neighbours_info.push((i, Vec::<OutEdge<F>>::new()));
}
for i in 0.. transformed_neighbours.len() {
let slot = transformed_neighbours[i].0;
sorted_neighbours_info[slot] = transformed_neighbours[i].clone();
}
return Some(sorted_neighbours_info);
}
fn get_max_edge_length_embedded_kgraph(&self, nbng : usize) -> Option<Vec<(usize,f64)>> {
let embedding = self.embedding.as_ref().unwrap();
let max_nb_connection = nbng;
let ef_c = 64;
let nb_nodes = embedding.nrows();
let nb_layer = 16.min((nb_nodes as f32).ln().trunc() as usize);
let mut hnsw = Hnsw::<F, DistL2F>::new(max_nb_connection, nb_nodes, nb_layer, ef_c, DistL2F{});
hnsw.set_keeping_pruned(true);
let vectors : Vec<ArrayView1<F>> = (0..nb_nodes).into_iter().map(|i| (embedding.row(i))).collect();
let mut data_with_id = Vec::<(&[F], usize)>::with_capacity(nb_nodes);
for i in 0..nb_nodes {
data_with_id.push((vectors[i].as_slice().unwrap(), i));
}
hnsw.parallel_insert_slice(&data_with_id);
let optimal_graph : Result<KGraph<F>, usize> = kgraph_from_hnsw_all(&hnsw, nbng);
if optimal_graph.is_err() {
log::error!("could not compute optimal graph");
return None;
}
let optimal_graph = optimal_graph.unwrap();
let optimal_max_edge = optimal_graph.compute_max_edge();
Some(optimal_max_edge)
}
#[allow(unused)]
pub fn get_quality_estimate_from_edge_length(&self, nbng : usize) -> Option<f64> {
let cpu_start = ProcessTime::now();
let sys_now = SystemTime::now();
let quality = 0f64;
let transformed_kgraph = self.get_transformed_kgraph();
if transformed_kgraph.is_none() {
log::error!("cannot ask for embedded quality before embedding");
std::process::exit(1);
}
let transformed_kgraph = transformed_kgraph.unwrap();
let max_edges_embedded = self.get_max_edge_length_embedded_kgraph(nbng);
if max_edges_embedded.is_none() {
log::error!("cannot compute mean edge length from embedded data");
return None;
}
let max_edges_embedded = max_edges_embedded.unwrap();
assert_eq!(max_edges_embedded.len(), transformed_kgraph.len());
let mut embedded_radii = CKMS::<f64>::new(0.01);
let mut ratio_dist_q = CKMS::<f64>::new(0.01);
let mut max_edges_q = CKMS::<f64>::new(0.01);
let nb_nodes = max_edges_embedded.len();
let mut nodes_match = Vec::with_capacity(nb_nodes);
let mut first_dist = Vec::with_capacity(nb_nodes);
let mut mean_ratio = (0. , 0usize);
for i in 0..nb_nodes {
assert_eq!(i, max_edges_embedded[i].0);
assert_eq!(i, transformed_kgraph[i].0);
nodes_match.push(0);
embedded_radii.insert(max_edges_embedded[i].1);
max_edges_q.insert(max_edges_embedded[i].1);
let neighbours = &transformed_kgraph[i].1;
for e in 0..neighbours.len() {
if neighbours[e].weight.to_f64().unwrap() <= max_edges_embedded[i].1 {
nodes_match[i] += 1;
}
ratio_dist_q.insert(neighbours[e].weight.to_f64().unwrap() / max_edges_embedded[i].1);
mean_ratio.0 += neighbours[e].weight.to_f64().unwrap() / max_edges_embedded[i].1;
}
mean_ratio.1 += neighbours.len();
first_dist.push(neighbours[0].weight.to_f64().unwrap());
}
let nb_without_match = nodes_match.iter().fold(0, |acc, x| if *x == 0 {acc +1} else {acc});
let mean_nbmatch: f64 = nodes_match.iter().sum::<usize>() as f64 / (nodes_match.len() - nb_without_match) as f64;
println!("\n\n a guess at quality ");
println!(" nb neighbourhoods without a match : {}, mean number of neighbours conserved when match : {:.3e}", nb_without_match, mean_nbmatch);
println!(" embedded radii quantiles at 0.05 : {:.2e} , 0.25 : {:.2e}, 0.5 : {:.2e}, 0.75 : {:.2e}, 0.85 : {:.2e}, 0.95 : {:.2e} \n",
embedded_radii.query(0.05).unwrap().1, embedded_radii.query(0.25).unwrap().1, embedded_radii.query(0.5).unwrap().1,
embedded_radii.query(0.75).unwrap().1, embedded_radii.query(0.85).unwrap().1, embedded_radii.query(0.95).unwrap().1);
println!("\n quantiles on max edges in embedded space");
println!(" quantiles at 0.05 : {:.2e} , 0.25 : {:.2e}, 0.5 : {:.2e}, 0.75 : {:.2e}, 0.85 : {:.2e}, 0.95 : {:.2e} \n",
max_edges_q.query(0.05).unwrap().1, max_edges_q.query(0.25).unwrap().1, max_edges_q.query(0.5).unwrap().1,
max_edges_q.query(0.75).unwrap().1, max_edges_q.query(0.85).unwrap().1, max_edges_q.query(0.95).unwrap().1);
println!("\n statistics on conservation of neighborhood (of size nbng)");
println!(" quantiles on ratio : distance in embedded space of neighbours of origin space / distance of last neighbour in embedded space");
println!(" quantiles at 0.05 : {:.2e} , 0.25 : {:.2e}, 0.5 : {:.2e}, 0.75 : {:.2e}, 0.85 : {:.2e}, 0.95 : {:.2e} \n",
ratio_dist_q.query(0.05).unwrap().1, ratio_dist_q.query(0.25).unwrap().1, ratio_dist_q.query(0.5).unwrap().1,
ratio_dist_q.query(0.75).unwrap().1, ratio_dist_q.query(0.85).unwrap().1, ratio_dist_q.query(0.95).unwrap().1);
let median_ratio = ratio_dist_q.query(0.5).unwrap().1;
println!("\n quality index: ratio of distance to neighbours in origin space / distance to last neighbour in embedded space");
println!(" neighborhood are conserved in radius multiplied by median : {:.2e}, mean {:.2e} ", median_ratio, mean_ratio.0 / mean_ratio.1 as f64);
let mut csv_dist = Writer::from_path("first_dist.csv").unwrap();
let _res = write_csv_labeled_array2(&mut csv_dist, first_dist.as_slice(), &self.get_embedded_reindexed());
csv_dist.flush().unwrap();
let cpu_time: Duration = cpu_start.elapsed();
log::info!(" quality estimation, sys time(s) {:?} cpu time {:?}", sys_now.elapsed().unwrap().as_secs(), cpu_time.as_secs());
return Some(quality);
}
#[allow(unused)]
fn get_scale_from_umap(&self, norm: f64, neighbours: &Vec<OutEdge<F>>) -> (f32, Vec<f32>) {
let nbgh = neighbours.len();
let rho_x = neighbours[0].weight.to_f32().unwrap();
let mut dist = neighbours
.iter()
.map(|n| n.weight.to_f32().unwrap())
.collect::<Vec<f32>>();
let f = |beta: f32| {
dist.iter()
.map(|d| (-(d - rho_x) * beta).exp())
.sum::<f32>()
};
let beta = dichotomy_solver(false, f, 0f32, f32::MAX, norm as f32).unwrap();
for i in 0..nbgh {
dist[i] = (-(dist[i] - rho_x) * beta).exp();
}
(1. / beta, dist)
}
pub fn get_nb_nodes(&self) -> usize {
self.initial_space.as_ref().unwrap().get_nb_nodes()
}
fn entropy_optimize(&self, params : &EmbedderParams, initial_embedding : &Array2<F>) -> Result<Array2<F>, String> {
log::debug!("in Embedder::entropy_optimize");
if self.initial_space.is_none() {
log::error!("Embedder::entropy_optimize : initial_space not constructed, exiting");
return Err(String::from(" initial_space not constructed, no NodeParams"));
}
let ce_optimization = EntropyOptim::new(self.initial_space.as_ref().unwrap(), params, initial_embedding);
let start = ProcessTime::now();
let initial_ce = ce_optimization.ce_compute_threaded();
let cpu_time: Duration = start.elapsed();
println!(" initial cross entropy value {:.2e}, in time {:?}", initial_ce, cpu_time);
let grad_step_init = params.grad_step;
log::info!("grad_step_init : {:.2e}", grad_step_init);
log::debug!("in Embedder::entropy_optimize ... gradient iterations");
let nb_sample_by_iter = params.nb_sampling_by_edge * ce_optimization.get_nb_edges();
log::info!("\n optimizing embedding");
log::info!(" nb edges {} , number of edge sampling by grad iteration {}", ce_optimization.get_nb_edges(), nb_sample_by_iter);
log::info!(" nb iteration : {} sampling size {} ", self.get_nb_grad_batch(), nb_sample_by_iter);
let cpu_start = ProcessTime::now();
let sys_start = SystemTime::now();
for iter in 1..=self.get_nb_grad_batch() {
let grad_step = grad_step_init * (1.- iter as f64/self.get_nb_grad_batch() as f64);
ce_optimization.gradient_iteration_threaded(nb_sample_by_iter, grad_step);
}
println!(" gradient iterations sys time(s) {:.2e} , cpu_time(s) {:.2e}", sys_start.elapsed().unwrap().as_secs(), cpu_start.elapsed().as_secs());
let final_ce = ce_optimization.ce_compute_threaded();
println!(" final cross entropy value {:.2e}", final_ce);
let dim = self.get_asked_dimension();
let nbrow = self.get_nb_nodes();
let mut reindexed = Array2::<F>::zeros((nbrow, dim));
for i in 0..nbrow {
let row = ce_optimization.get_embedded_data(i);
for j in 0..dim {
reindexed[[i,j]] = row.read()[j];
}
}
Ok(reindexed)
}
}
struct EntropyOptim<'a, F> {
node_params: &'a NodeParams,
edges : Vec<(NodeIdx, OutEdge<f32>)>,
embedded : Vec<Arc<RwLock<Array1<F>>>>,
embedded_scales : Vec<f32>,
pos_edge_distribution : WeightedAliasIndex<f32>,
params : &'a EmbedderParams,
}
impl <'a, F> EntropyOptim<'a,F>
where F: Float + NumAssign + std::iter::Sum + num_traits::cast::FromPrimitive + Send + Sync + ndarray::ScalarOperand {
pub fn new(node_params : &'a NodeParams, params: &'a EmbedderParams, initial_embed : &Array2<F>) -> Self {
log::debug!("entering EntropyOptim::new");
let nbng = node_params.params[0].edges.len();
let nbnodes = node_params.get_nb_nodes();
let mut edges = Vec::<(NodeIdx, OutEdge<f32>)>::with_capacity(nbnodes*nbng);
let mut edges_weight = Vec::<f32>::with_capacity(nbnodes*nbng);
let mut initial_scales = Vec::<f32>::with_capacity(nbnodes);
for i in 0..nbnodes {
initial_scales.push(node_params.params[i].scale);
for j in 0..node_params.params[i].edges.len() {
edges.push((i, node_params.params[i].edges[j]));
edges_weight.push(node_params.params[i].edges[j].weight);
}
}
log::debug!("construction alias table for sampling edges..");
let start = ProcessTime::now();
let pos_edge_sampler = WeightedAliasIndex::new(edges_weight).unwrap();
let cpu_time: Duration = start.elapsed();
log::debug!("constructied alias table for sampling edges.. , time : {:?}", cpu_time);
let mut embedded = Vec::<Arc<RwLock<Array1<F>>>>::new();
let nbrow = initial_embed.nrows();
for i in 0..nbrow {
embedded.push(Arc::new(RwLock::new(initial_embed.row(i).to_owned())));
}
let embedded_scales = estimate_embedded_scales_from_initial_scales(&initial_scales);
let mut scales_q = CKMS::<f32>::new(0.001);
for s in &embedded_scales {
scales_q.insert(*s);
}
println!("\n\n embedded scales quantiles at 0.05 : {:.2e} , 0.5 : {:.2e}, 0.95 : {:.2e}, 0.99 : {:.2e}",
scales_q.query(0.05).unwrap().1, scales_q.query(0.5).unwrap().1,
scales_q.query(0.95).unwrap().1, scales_q.query(0.99).unwrap().1);
println!("");
EntropyOptim { node_params, edges, embedded, embedded_scales,
pos_edge_distribution : pos_edge_sampler,
params : params}
}
pub fn get_nb_edges(&self) -> usize {
self.edges.len()
}
#[allow(unused)]
fn get_embedded_raw(& self) -> Array2<F> {
let nbrow = self.embedded.len();
let nbcol = self.params.asked_dim;
let mut embedding_res = Array2::<F>::zeros((nbrow, nbcol));
for i in 0..nbrow {
let row = self.embedded[i].read();
for j in 0..nbcol {
embedding_res[[i,j]] = row[j];
}
}
return embedding_res;
}
#[allow(unused)]
pub fn get_embedded_reindexed(& self, indexset: &IndexSet<NodeIdx>) -> Array2<F> {
let nbrow = self.embedded.len();
let nbcol = self.params.asked_dim;
let mut embedding_res = Array2::<F>::zeros((nbrow, nbcol));
for i in 0..nbrow {
let row = self.embedded[i].read();
let origin_id = indexset.get_index(i).unwrap();
for j in 0..nbcol {
embedding_res[[*origin_id,j]] = row[j];
}
}
return embedding_res;
}
fn get_embedded_data(&self, row : usize) -> Arc<RwLock<Array1<F>>> {
Arc::clone(&self.embedded[row])
}
#[allow(unused)]
fn ce_compute(&self) -> f64 {
log::trace!("\n entering EntropyOptim::ce_compute");
let mut ce_entropy = 0.;
let b : f64 = self.params.b;
for edge in self.edges.iter() {
let node_i = edge.0;
let node_j = edge.1.node;
assert!(node_i != node_j);
let weight_ij = edge.1.weight as f64;
let weight_ij_embed = cauchy_edge_weight(&self.embedded[node_i].read(),
self.embedded_scales[node_i] as f64, b,
&self.embedded[node_j].read()).to_f64().unwrap();
if weight_ij_embed > 0. {
ce_entropy += -weight_ij * weight_ij_embed.ln();
}
if weight_ij_embed < 1. {
ce_entropy += - (1. - weight_ij) * (1. - weight_ij_embed).ln();
}
if !ce_entropy.is_finite() {
log::debug!("weight_ij {} weight_ij_embed {}", weight_ij, weight_ij_embed);
std::panic!();
}
}
ce_entropy
}
fn ce_compute_threaded(&self) -> f64 {
log::trace!("\n entering EntropyOptim::ce_compute_threaded");
let b : f64 = self.params.b;
let ce_entropy = self.edges.par_iter()
.fold( || 0.0f64, | entropy : f64, edge| entropy + {
let node_i = edge.0;
let node_j = edge.1.node;
let weight_ij = edge.1.weight as f64;
let weight_ij_embed = cauchy_edge_weight(&self.embedded[node_i].read(),
self.embedded_scales[node_i] as f64, b,
&self.embedded[node_j].read()).to_f64().unwrap();
let mut term = 0.;
if weight_ij_embed > 0. {
term += -weight_ij * weight_ij_embed.ln();
}
if weight_ij_embed < 1. {
term += - (1. - weight_ij) * (1. - weight_ij_embed).ln();
}
term
})
.sum::<f64>();
return ce_entropy;
}
fn ce_optim_edge_shannon(&self, threaded : bool, grad_step : f64)
where
F: Float + NumAssign + std::iter::Sum + num_traits::cast::FromPrimitive + ndarray::ScalarOperand
{
let edge_idx_sampled : usize;
let mut y_i;
let mut y_j;
let node_j;
let node_i;
if threaded {
edge_idx_sampled = thread_rng().sample(&self.pos_edge_distribution);
node_i = self.edges[edge_idx_sampled].0;
node_j = self.edges[edge_idx_sampled].1.node;
y_i = self.get_embedded_data(node_i).read().to_owned();
y_j = self.get_embedded_data(node_j).read().to_owned();
} else {
edge_idx_sampled = thread_rng().sample(&self.pos_edge_distribution);
node_i = self.edges[edge_idx_sampled].0;
y_i = self.get_embedded_data(node_i).write().to_owned();
node_j = self.edges[edge_idx_sampled].1.node;
y_j = self.get_embedded_data(node_j).write().to_owned();
};
let dim = self.params.asked_dim;
let mut gradient = Array1::<F>::zeros(dim);
assert!(node_i != node_j);
let weight = self.edges[edge_idx_sampled].1.weight as f64;
assert!(weight <= 1.);
let scale = self.embedded_scales[node_i] as f64;
let b : f64 = self.params.b;
let d_ij : f64 = y_i.iter().zip(y_j.iter()).map(|(vi,vj)| (*vi-*vj)*(*vi-*vj)).sum::<F>().to_f64().unwrap();
let d_ij_scaled = d_ij/(scale*scale);
let coeff : f64;
if b != 1. {
let cauchy_weight = 1./ (1. + d_ij_scaled.powf(b));
coeff = 2. * b * cauchy_weight * d_ij_scaled.powf(b - 1.)/ (scale*scale);
}
else {
let cauchy_weight = 1./ (1. + d_ij_scaled);
coeff = 2. * b * cauchy_weight / (scale*scale);
}
if d_ij_scaled > 0. {
let alfa = (1./ PROBA_MIN) as f64;
let coeff_repulsion = 1. / (d_ij_scaled*d_ij_scaled).max(alfa);
let coeff_ij = (grad_step * coeff * (- weight + (1.-weight) * coeff_repulsion)).max(-0.49);
gradient = (&y_j - &y_i) * F::from(coeff_ij).unwrap();
log::trace!("norm attracting coeff {:.2e} gradient {:.2e}", coeff_ij, l2_norm(&gradient.view()).to_f64().unwrap());
}
y_i -= &gradient;
y_j += &gradient;
*(self.get_embedded_data(node_j).write()) = y_j;
let asked_nb_neg = 5;
let mut got_nb_neg = 0;
let mut _nb_failed = 0;
while got_nb_neg < asked_nb_neg {
let neg_node : NodeIdx = thread_rng().gen_range(0..self.embedded_scales.len());
if neg_node != node_i && neg_node != node_j && self.node_params.get_node_param(node_i).get_edge(neg_node).is_none() {
let neg_data = self.get_embedded_data(neg_node);
let y_k;
if let Some(lock_read) = neg_data.try_read() {
y_k = lock_read.to_owned();
got_nb_neg += 1;
}
else { log::trace!("neg lock failed");
_nb_failed += 1;
continue;
}
let d_ik : f64 = y_i.iter().zip(y_k.iter()).map(|(vi,vj)| (*vi-*vj)*(*vi-*vj)).sum::<F>().to_f64().unwrap();
let d_ik_scaled = d_ik/(scale*scale);
let coeff : f64;
if b != 1. {
let cauchy_weight = 1./ (1. + d_ik_scaled.powf(b));
coeff = 2. * b * cauchy_weight * d_ik_scaled.powf(b - 1.)/ (scale*scale);
}
else {
let cauchy_weight = 1./ (1. + d_ik_scaled);
coeff = 2. * b * cauchy_weight/ (scale*scale);
}
let alfa = 1./16.;
if d_ik > 0. {
let coeff_repulsion = 1. /(d_ik_scaled * d_ik_scaled).max(alfa); let coeff_ik = (grad_step * coeff * coeff_repulsion).min(2.);
gradient = (&y_k - &y_i) * F::from_f64(coeff_ik).unwrap();
log::trace!("norm repulsive coeff gradient {:.2e} {:.2e}", coeff_ik , l2_norm(&gradient.view()).to_f64().unwrap());
}
y_i -= &gradient;
} } *(self.get_embedded_data(node_i).write()) = y_i;
}
#[allow(unused)]
fn gradient_iteration(&self, nb_sample : usize, grad_step : f64) {
for _ in 0..nb_sample {
self.ce_optim_edge_shannon(false, grad_step);
}
}
fn gradient_iteration_threaded(&self, nb_sample : usize, grad_step : f64) {
(0..nb_sample).into_par_iter().for_each( |_| self.ce_optim_edge_shannon(true, grad_step));
}
}
pub(crate) fn to_proba_edges<F>(kgraph : & KGraph<F>, scale_rho : f32, beta : f32) -> NodeParams
where F : Float + num_traits::cast::FromPrimitive + std::marker::Sync + std::marker::Send + std::fmt::UpperExp + std::iter::Sum {
let mut perplexity_q : CKMS<f32> = CKMS::<f32>::new(0.001);
let mut scale_q : CKMS<f32> = CKMS::<f32>::new(0.001);
let mut weight_q : CKMS<f32> = CKMS::<f32>::new(0.001);
let neighbour_hood = kgraph.get_neighbours();
let scale_perplexity = | i : usize | -> (usize, Option<(f32, NodeParam)>) {
if neighbour_hood[i].len() > 0 {
let node_param = get_scale_from_proba_normalisation(kgraph, scale_rho, beta, &neighbour_hood[i]);
let perplexity = node_param.get_perplexity();
return (i, Some((perplexity, node_param)));
}
else {
return (i, None);
}
};
let mut opt_node_params : Vec::<(usize,Option<(f32, NodeParam)>)> = Vec::<(usize,Option<(f32, NodeParam)>)>::new();
let mut node_params : Vec<NodeParam> = (0..neighbour_hood.len()).into_iter().map(|_| NodeParam::default()).collect();
(0..neighbour_hood.len()).into_par_iter().map(|i| scale_perplexity(i)).collect_into_vec(&mut opt_node_params);
let mut max_nbng = 0;
for opt_param in &opt_node_params {
match opt_param {
(i, Some(param)) => {
perplexity_q.insert(param.0);
scale_q.insert(param.1.scale);
let j = thread_rng().gen_range(0..param.1.edges.len());
weight_q.insert(param.1.edges[j].weight);
max_nbng = param.1.edges.len().max(max_nbng);
assert_eq!(param.1.edges.len(), neighbour_hood[*i].len());
node_params[*i] = param.1.clone();
}
(i, None) => {
println!("to_proba_edges , node rank {}, has no neighbour, use hnsw.set_keeping_pruned(true)", i);
log::error!("to_proba_edges , node rank {}, has no neighbour, use hnsw.set_keeping_pruned(true)", i);
std::process::exit(1);
}
};
}
println!("\n constructed initial space");
println!("\n scales quantile at 0.05 : {:.2e} , 0.5 : {:.2e}, 0.95 : {:.2e}, 0.99 : {:.2e}",
scale_q.query(0.05).unwrap().1, scale_q.query(0.5).unwrap().1,
scale_q.query(0.95).unwrap().1, scale_q.query(0.99).unwrap().1);
println!("\n edge weight quantile at 0.05 : {:.2e} , 0.5 : {:.2e}, 0.95 : {:.2e}, 0.99 : {:.2e}",
weight_q.query(0.05).unwrap().1, weight_q.query(0.5).unwrap().1,
weight_q.query(0.95).unwrap().1, weight_q.query(0.99).unwrap().1);
println!("\n perplexity quantile at 0.05 : {:.2e} , 0.5 : {:.2e}, 0.95 : {:.2e}, 0.99 : {:.2e}",
perplexity_q.query(0.05).unwrap().1, perplexity_q.query(0.5).unwrap().1,
perplexity_q.query(0.95).unwrap().1, perplexity_q.query(0.99).unwrap().1);
println!("");
NodeParams::new(node_params, max_nbng)
}
fn get_scale_from_proba_normalisation<F> (kgraph : & KGraph<F>, scale_rho : f32, beta : f32, neighbours: &Vec<OutEdge<F>>) -> NodeParam
where F : Float + num_traits::cast::FromPrimitive + Sync + Send + std::fmt::UpperExp + std::iter::Sum {
let nbgh = neighbours.len();
assert!(nbgh > 0);
let rho_x = neighbours[0].weight.to_f32().unwrap();
let mut rho_y_s = Vec::<f32>::with_capacity(neighbours.len() + 1);
for i in 0..nbgh {
let y_i = neighbours[i].node; rho_y_s.push(kgraph.get_neighbours()[y_i][0].weight.to_f32().unwrap());
} rho_y_s.push(rho_x);
let mean_rho = rho_y_s.iter().sum::<f32>() / (rho_y_s.len() as f32);
let scale = scale_rho * mean_rho;
let mut all_equal = false;
let first_dist = neighbours[0].weight.to_f32().unwrap();
let last_n = neighbours.iter().rfind(|&n| n.weight.to_f32().unwrap() > 0.);
if last_n.is_none() {
all_equal = true;
}
let remap_weight = | w : F , shift : f32, scale : f32 , beta : f32| (-((w.to_f32().unwrap() - shift).max(0.)/ scale).pow(beta)).exp();
if !all_equal {
let last_dist = last_n.unwrap().weight.to_f32().unwrap();
if last_dist > first_dist {
if remap_weight(F::from(last_dist).unwrap(), first_dist, scale, beta )/remap_weight(F::from(first_dist).unwrap(), first_dist, scale, beta) < PROBA_MIN.ln() {
log::info!("too large variation of neighbours probablities , increase scale_rho or reduce beta");
}
let mut probas_edge = neighbours
.iter()
.map(|n| OutEdge::<f32>::new(n.node, remap_weight(n.weight, first_dist, scale, beta).max(PROBA_MIN)) )
.collect::<Vec<OutEdge<f32>>>();
let proba_range = probas_edge[probas_edge.len() - 1].weight / probas_edge[0].weight;
log::trace!(" first dist {:2e} last dist {:2e}", first_dist, last_dist);
log::trace!("scale : {:.2e} , first neighbour proba {:2e}, last neighbour proba {:2e} proba gap {:.2e}", scale, probas_edge[0].weight,
probas_edge[probas_edge.len() - 1].weight,
proba_range);
if proba_range < PROBA_MIN {
log::error!(" first dist {:2e} last dist {:2e}", first_dist, last_dist);
log::error!("scale : {:.2e} , first neighbour proba {:2e}, last neighbour proba {:2e} proba gap {:.2e}", scale, probas_edge[0].weight,
probas_edge[probas_edge.len() - 1].weight,
proba_range);
}
assert!(proba_range >= PROBA_MIN, "proba range {:.2e} too low edge proba, increase scale_rho or reduce beta", proba_range);
let sum = probas_edge.iter().map(|e| e.weight).sum::<f32>();
for i in 0..nbgh {
probas_edge[i].weight = probas_edge[i].weight / sum;
}
return NodeParam::new(scale, probas_edge);
}
else {
all_equal = true;
}
}
if all_equal {
let probas_edge = neighbours
.iter()
.map(|n| OutEdge::<f32>::new(n.node, 1.0 / nbgh as f32))
.collect::<Vec<OutEdge<f32>>>();
return NodeParam::new(scale, probas_edge);
}
else {
log::error!("fatal error in get_scale_from_proba_normalisation, should not happen!");
std::panic!("incoherence error");
}
}
fn clip<F>(f : F, max : f64) -> F
where F: Float + num_traits::FromPrimitive {
let f_r = f.to_f64().unwrap();
let truncated = if f_r > max {
log::trace!("truncated >");
max
}
else if f_r < -max {
log::trace!("truncated <");
-max
}
else {
f_r
};
return F::from(truncated).unwrap();
}
fn cauchy_edge_weight<F>(initial_point: &Array1<F>, scale: f64, b : f64, other: &Array1<F>) -> F
where
F: Float + std::iter::Sum + num_traits::FromPrimitive
{
let dist = initial_point
.iter()
.zip(other.iter())
.map(|(i, f)| (*i - *f) * (*i - *f))
.sum::<F>();
let mut dist_f64 = dist.to_f64().unwrap() / (scale*scale);
dist_f64 = dist_f64.powf(b);
let weight = 1. / (1. + dist_f64);
let mut weight_f = F::from_f64(weight).unwrap();
if !(weight_f < F::one()) {
weight_f = F::one() - F::epsilon();
log::trace!("cauchy_edge_weight fixing dist_f64 {:2.e}", dist_f64);
}
assert!(weight_f < F::one());
assert!(weight_f.is_normal());
return weight_f;
}
fn l2_norm<F>(v: &ArrayView1<'_, F>) -> F
where F : Float + std::iter::Sum + num_traits::cast::FromPrimitive {
v.iter().map(|x| (*x) * (*x)).sum::<F>().sqrt()
}
fn estimate_embedded_scales_from_initial_scales(initial_scales :&Vec<f32>) -> Vec<f32> {
log::trace!("estimate_embedded_scale_from_initial_scales");
let mean_scale : f32 = initial_scales.iter().sum::<f32>() / (initial_scales.len() as f32);
let scale_sup = 4.0; let scale_inf = 1./scale_sup;
let width = 0.2;
let embedded_scale : Vec<f32> = initial_scales.iter().map(|&x| width * (x/mean_scale).min(scale_sup).max(scale_inf)).collect();
for i in 0..embedded_scale.len() {
log::trace!("embedded scale for node {} : {:.2e}", i , embedded_scale[i]);
}
embedded_scale
}
fn set_data_box<F>(data : &mut Array2<F>, box_size : f64)
where F: Float + NumAssign + std::iter::Sum<F> + num_traits::cast::FromPrimitive + ndarray::ScalarOperand {
let nbdata = data.nrows();
let dim = data.ncols();
let mut means = Array1::<F>::zeros(dim);
let mut max_max = F::zero();
for j in 0..dim {
for i in 0..nbdata {
means[j] += data[[i,j]];
}
means[j] /= F::from(nbdata).unwrap();
}
for j in 0..dim {
for i in 0..nbdata {
data[[i,j]] = data[[i,j]] - means[j];
max_max = max_max.max(data[[i,j]].abs());
}
}
max_max /= F::from(0.5 * box_size).unwrap();
for f in data.iter_mut() {
*f = (*f)/max_max;
assert!((*f).abs() <= F::one());
}
}
#[cfg(test)]
mod tests {
use super::*;
fn log_init_test() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[cfg(test)]
fn gen_rand_data_f32(nb_elem: usize , dim:usize) -> Vec<Vec<f32>> {
let mut data = Vec::<Vec<f32>>::with_capacity(nb_elem);
let mut rng = thread_rng();
let unif = Uniform::<f32>::new(0.,1.);
for i in 0..nb_elem {
let val = 2. * i as f32 * rng.sample(unif);
let v :Vec<f32> = (0..dim).into_iter().map(|_| val * rng.sample(unif)).collect();
data.push(v);
}
data
}
#[test]
fn mini_embed_full() {
log_init_test();
let nb_elem = 500;
let embed_dim = 20;
let data = gen_rand_data_f32(nb_elem, embed_dim);
let data_with_id = data.iter().zip(0..data.len()).collect();
let ef_c = 50;
let max_nb_connection = 50;
let nb_layer = 16.min((nb_elem as f32).ln().trunc() as usize);
let mut hns = Hnsw::<f32, DistL1>::new(max_nb_connection, nb_elem, nb_layer, ef_c, DistL1{});
hns.set_keeping_pruned(true);
hns.parallel_insert(&data_with_id);
hns.dump_layer_info();
let knbn = 10;
log::info!("calling kgraph.init_from_hnsw_all");
let kgraph : KGraph<f32> = kgraph_from_hnsw_all(&hns, knbn).unwrap();
log::info!("minimum number of neighbours {}", kgraph.get_max_nbng());
let _kgraph_stats = kgraph.get_kraph_stats();
let mut embed_params = EmbedderParams::default();
embed_params.asked_dim = 5;
let mut embedder = Embedder::new(&kgraph, embed_params);
let embed_res = embedder.embed();
assert!(embed_res.is_ok());
}
}