use anyhow::*;
use rayon::prelude::*;
use std::sync::Arc;
use dashmap::DashMap;
use std::collections::HashMap;
use std::collections::hash_map; use std::hash::Hash;
use ndarray::{Array1, Array2};
use std::io::Write;
use rand::Rng;
use rand_xoshiro::Xoshiro256PlusPlus;
use rand_xoshiro::rand_core::SeedableRng;
use cpu_time::ProcessTime;
use std::time::{Duration, SystemTime};
use crate::bmor::*;
use crate::discrete::DiscreteProba;
use crate::facility::*;
use crate::makeiter::*;
use anndists::dist::*;
#[derive(Copy, Clone, Debug)]
struct Point<DataId> {
pub(self) id: DataId,
pub(self) _rank: usize,
pub(self) proba: f64,
}
struct PointSampler<DataId> {
proba: DiscreteProba<f64>,
w_index: HashMap<usize, DataId>,
}
impl<DataId: Clone> PointSampler<DataId> {
fn new(weights: &Vec<f64>, w_index: HashMap<usize, DataId>) -> Self {
PointSampler {
proba: DiscreteProba::new(weights),
w_index,
}
}
fn sample<R>(&self, rng: &mut R) -> Point<DataId>
where
R: Rng,
{
let (rank, proba) = self.proba.sample(rng);
let opt_id = self.w_index.get(&rank);
if opt_id.is_none() {
log::error!("no id for rank sampled : {}", rank);
std::panic!();
}
let id = opt_id.cloned().unwrap();
Point {
id,
_rank: rank,
proba,
}
} }
pub struct CoreSet<DataId, T: Send + Sync + Clone, Dist: Distance<T> + Clone + Sync + Send> {
id_weight_map: HashMap<DataId, f64>,
datas_wid: Option<Vec<(DataId, Vec<T>)>>,
distance: Dist,
}
impl<DataId, T: Send + Sync + Clone, Dist> CoreSet<DataId, T, Dist>
where
DataId: Eq + Hash + Send + Sync + Clone + std::fmt::Debug,
Dist: Distance<T> + Clone + Sync + Send,
{
pub fn new(
core_w: HashMap<DataId, f64>,
datas_wid: Option<Vec<(DataId, Vec<T>)>>,
distance: Dist,
) -> CoreSet<DataId, T, Dist> {
assert_eq!(core_w.len(), datas_wid.as_ref().unwrap().len());
CoreSet {
id_weight_map: core_w,
datas_wid,
distance,
}
}
pub fn get_nb_points(&self) -> usize {
self.id_weight_map.len()
}
pub fn get_weight(&self, data_id: &DataId) -> Option<f64> {
let index_res = self.id_weight_map.get(data_id);
index_res.copied()
}
pub fn get_data_ids(&self) -> hash_map::Keys<DataId, f64> {
self.id_weight_map.keys()
}
pub fn get_items(&self) -> hash_map::Iter<DataId, f64> {
self.id_weight_map.iter()
}
pub fn get_data_points(&self) -> Option<&Vec<(DataId, Vec<T>)>> {
match &self.datas_wid {
Some(_) => self.datas_wid.as_ref(),
_ => None,
}
}
#[allow(clippy::let_and_return)]
pub(crate) fn get_point_by_rank(&self, r: usize) -> Option<(DataId, &Vec<T>)> {
let res = match self.datas_wid.as_ref() {
Some(v) => {
if r < v.len() {
Some((v[r].0.clone(), &v[r].1))
} else {
log::error!(
"get_point_by_rank could not find data vector r : {} size : {}",
r,
v.len()
);
std::panic!("get_point_by_rank could not find data vector r");
}
}
None => {
log::error!("get_point_by_rank could not find data vector r : {}", r);
std::panic!("get_point_by_rank could not find data vector r");
}
};
res
}
pub fn dump(&self) -> anyhow::Result<usize> {
let mut name = String::from("coreset");
name.push_str(".csv");
let file = std::fs::File::create(&name)?;
let mut bufw = std::io::BufWriter::new(file);
let mut nb_record = 0usize;
for (id, weight) in &self.id_weight_map {
writeln!(bufw, "{:?},{:.3e}\n", id, weight)?;
nb_record += 1;
}
bufw.flush().unwrap();
println!(
" coreset dumped in file : {}, nb_record {}",
name, nb_record
);
assert_eq!(nb_record, self.get_nb_points());
Ok(nb_record)
}
pub fn compute_distances(&self) -> Option<(Vec<DataId>, Array2<f32>)> {
let nbpoints = self.get_nb_points();
let mut distances = Array2::<f32>::zeros((0, nbpoints));
let compute_row = |i| -> Array1<f32> {
let mut row_i = Array1::zeros(nbpoints);
let (_, p_i) = self.get_point_by_rank(i).unwrap();
for j in 0..nbpoints {
let p_j = self.get_point_by_rank(j).unwrap().1;
if j != i {
row_i[j] = self.distance.eval(p_i, p_j);
}
}
row_i
};
let rows: Vec<(usize, Array1<f32>)> = (0..nbpoints)
.into_par_iter()
.map(|i| (i, compute_row(i)))
.collect();
for (r, v) in &rows {
assert_eq!(*r, distances.shape()[0]);
distances.push_row(v.into()).unwrap();
}
let ids: Vec<DataId> = self
.datas_wid
.as_ref()
.unwrap()
.iter()
.map(|(id, _)| (*id).clone())
.collect();
Some((ids, distances))
} }
pub struct Coreset1<DataId, T: Send + Sync + Clone, Dist: Distance<T> + Clone + Sync + Send> {
phase: usize,
nb_data: usize,
bmor: Bmor<DataId, T, Dist>,
facilities: Option<Facilities<DataId, T, Dist>>,
point_facility_map: Option<Arc<DashMap<DataId, PointMap>>>,
}
impl<DataId, T: Send + Sync + Clone, Dist> Coreset1<DataId, T, Dist>
where
Dist: Distance<T> + Clone + Sync + Send,
DataId: Eq + Hash + std::fmt::Debug + Clone + Send + Sync,
{
pub fn new(k: usize, nbdata_expected: usize, beta: f64, gamma: f64, distance: Dist) -> Self {
let bmor = Bmor::new(k, nbdata_expected, beta, gamma, distance);
let phase = 0usize;
Coreset1 {
phase,
nb_data: 0,
bmor,
facilities: None,
point_facility_map: None,
}
}
pub fn make_coreset<IterGenerator>(
&mut self,
iter_generator: &IterGenerator,
fraction: f64,
) -> anyhow::Result<CoreSet<DataId, T, Dist>>
where
IterGenerator: MakeIter<Item = (DataId, Vec<T>)>,
DataId: Eq + Hash + std::fmt::Debug + Send + Sync,
{
let cpu_start = ProcessTime::now();
let sys_now = SystemTime::now();
let iter = iter_generator.makeiter();
let res1 = self.process_data_iterator(iter);
if res1.is_err() {
log::error!("first pass failed");
return Err(anyhow!("first pass failed"));
}
log::debug!("end of first pass, second pass to compute point facility map");
self.facilities.as_mut().unwrap().empty();
self.init_facility_map(self.nb_data);
let iter = iter_generator.makeiter();
let res2 = self.process_data_iterator(iter);
if res2.is_err() {
log::error!("seond pass failed");
return Err(anyhow!("second pass failed"));
}
log::debug!("end of second pass, doing sensitivity and sampling computations");
let sampler = self.build_sampling_distribution();
self.point_facility_map = None;
let id_weight_map = self.sample_coreset(&sampler, fraction);
let distance = self.facilities.as_ref().unwrap().get_distance();
let id_data_map = self.retrieve_corepoints_by_id(&id_weight_map, iter_generator);
let cpu_time: Duration = cpu_start.elapsed();
println!(
"\n Coreset1::make_coreset sys time(ms) {:?} cpu time(ms) {:?}",
sys_now.elapsed().unwrap().as_millis(),
cpu_time.as_millis()
);
Ok(CoreSet::new(
id_weight_map,
Some(id_data_map),
distance.clone(),
))
}
fn retrieve_corepoints_by_id<IterGenerator>(
&self,
id_weight_map: &HashMap<DataId, f64>,
iter_generator: &IterGenerator,
) -> Vec<(DataId, Vec<T>)>
where
IterGenerator: MakeIter<Item = (DataId, Vec<T>)>,
DataId: Eq + Hash + std::fmt::Debug,
{
let iter = iter_generator.makeiter();
let nbpoints = id_weight_map.len();
let mut datas_wid: Vec<(DataId, Vec<T>)> = Vec::with_capacity(nbpoints);
for (id, data) in iter {
if id_weight_map.contains_key(&id) {
datas_wid.push((id, data));
}
}
if datas_wid.len() < nbpoints {
let mut set = indexmap::IndexSet::with_capacity(nbpoints + 100);
for (id, _) in &datas_wid {
set.insert(id);
}
for id in id_weight_map.keys() {
if set.get(id).is_none() {
log::error!(" we do not have id : {:?} in set", id);
}
}
}
assert_eq!(datas_wid.len(), nbpoints);
datas_wid
}
fn process_data_iterator(
&mut self,
mut iter: impl Iterator<Item = (DataId, Vec<T>)>,
) -> anyhow::Result<()> {
let bufsize: usize = 50000;
let mut datas = Vec::<Vec<T>>::with_capacity(bufsize);
let mut ids = Vec::<DataId>::with_capacity(bufsize);
loop {
let data_opt = iter.next();
match data_opt {
Some((id, data)) => {
datas.push(data);
ids.push(id);
if datas.len() == bufsize {
let res = self.process_data(&datas, &ids);
assert!(res.is_ok());
datas.clear();
ids.clear();
}
}
_ => {
if !datas.is_empty() {
let res = self.process_data(&datas, &ids);
assert!(res.is_ok());
datas.clear();
ids.clear();
}
break;
}
} } self.end_pass();
Ok(())
}
fn process_data(&mut self, data: &[Vec<T>], data_id: &[DataId]) -> anyhow::Result<()> {
if self.phase == 0 {
let _ = self.bmor.process_data(data, data_id).unwrap();
self.bmor.log();
self.nb_data += data.len();
Ok(())
} else {
let facilities_ref = self.facilities.as_ref().unwrap();
let dispatch_i = |item: usize| {
let (facility, dist) = facilities_ref
.get_nearest_facility(&data[item], false)
.unwrap();
let weight = 1.;
self.facilities
.as_ref()
.unwrap()
.insert_point(facility, dist, weight);
match &self.point_facility_map {
Some(f_map) => {
let p_map = PointMap::new(facility, dist, 1.);
let res = f_map.insert(data_id[item].clone(), p_map);
if res.is_some() {
log::error!("data_id {:?} is already present error", data_id[item]);
std::panic!();
}
log::trace!(
"inserted PointMap for data_id {:?} in facility map",
data_id[item]
);
}
_ => {
std::panic!("no facility_map allocated, should not happen")
}
}
};
(0..data.len()).into_par_iter().for_each(dispatch_i);
Ok(())
}
}
fn end_pass(&mut self) {
match self.phase {
0 => {
self.phase += 1;
let contraction = false;
self.facilities = Some(self.bmor.end_data(contraction));
log::debug!("end of first pass, processed nb data : {:?}", self.nb_data);
}
1 => {
log::debug!("end of second pass, doing sensitivity and sampling computations");
let _ = self.facilities.as_mut().unwrap().compute_weight_cost();
self.facilities.as_ref().unwrap().log(0);
}
_ => {
std::panic!("should not occurr");
}
}
}
pub fn get_facility_map(&self) -> Option<&Arc<DashMap<DataId, PointMap>>> {
self.point_facility_map.as_ref()
}
pub fn get_nb_data(&self) -> usize {
self.nb_data
}
fn init_facility_map(&mut self, capacity: usize) {
self.point_facility_map = Some(Arc::new(DashMap::with_capacity(capacity)))
}
fn build_sampling_distribution(&mut self) -> PointSampler<DataId> {
let facilities_ref = self.facilities.as_ref().unwrap();
let global_cost = facilities_ref.get_cost();
log::debug!(
"build_sampling_distribution got global cost : {:.3e}",
global_cost
);
let p_facility_map_ref = self.point_facility_map.as_ref().unwrap();
let nb_facilities = facilities_ref.len(); let mut cumul_proba = 0.;
let mut p_weights = Vec::<f64>::with_capacity(self.nb_data);
let mut w_index = HashMap::<usize, DataId>::with_capacity(self.nb_data);
let pmap_iter = p_facility_map_ref.iter();
for iter_ref in pmap_iter {
let (dataid, pointmap) = iter_ref.pair();
let mut proba = pointmap.get_dist() as f64 * pointmap.get_weight() as f64 / global_cost;
let f_weight = facilities_ref.get_facility_weight(pointmap.get_facility());
proba += pointmap.get_weight() as f64 / (nb_facilities as f64 * f_weight.unwrap());
proba *= 0.5;
assert!(proba > 0.);
p_weights.push(proba);
w_index.insert(p_weights.len() - 1, dataid.clone());
cumul_proba += proba;
}
log::debug!("cumul_proba : {:.5e}", cumul_proba);
assert!((1. - cumul_proba).abs() < 1.0E-5);
PointSampler::new(&p_weights, w_index)
}
fn sample_coreset(
&mut self,
sampler: &PointSampler<DataId>,
rate: f64,
) -> HashMap<DataId, f64> {
log::info!("sample_coreset fraction : {:.2e}", rate);
let nb_sample = (rate * self.nb_data as f64) as usize;
let mut coreset = HashMap::<DataId, f64>::with_capacity(2 * nb_sample);
let mut rng = Xoshiro256PlusPlus::seed_from_u64(14537);
for _ in 0..nb_sample {
let point = sampler.sample(&mut rng);
let weight = 1. / (point.proba * nb_sample as f64);
let id_weights = coreset.get_mut(&point.id);
match id_weights {
Some(old_weight) => {
*old_weight += weight;
}
None => {
coreset.insert(point.id, weight);
}
}
} log::info!(
"sensitivity::sample_coreset coreset nb points : {}",
coreset.len()
);
coreset
} }