use indexmap::IndexMap;
use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::fmt;
use glob::{glob_with, MatchOptions};
use std::io::Read;
use yaml_rust::{Yaml, YamlLoader};
use std::marker::PhantomData;
use std::cmp::min;
use rayon::prelude::*;
use super::*;
use super::errors::*;
use super::labels::*;
use super::labels::values::*;
use crate::datasources::*;
use crate::datasources::DataSource;
use super::distances::*;
use crate::utils::*;
pub struct PointCloud<M: Metric> {
addresses: IndexMap<PointIndex, (usize,usize)>,
names_to_indexes: IndexMap<PointName, PointIndex>,
indexes_to_names: IndexMap<PointIndex, PointName>,
data_sources: Vec<Box<dyn DataSource>>,
label_sources: Vec<MetadataList>,
loaded_centers: Mutex<IndexMap<PointIndex, Arc<Vec<f32>>>>,
data_dim: usize,
labels_scheme: LabelScheme,
chunk:usize,
metric: PhantomData<M>,
}
impl<M: Metric> fmt::Debug for PointCloud<M> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"PointCloud {{ number of points: {}, number of memmaps: {}}}",
self.addresses.len(), self.data_sources.len()
)
}
}
impl<M: Metric> PointCloud<M> {
pub fn from_memmap_files(
data_dim: usize,
labels_scheme: LabelScheme,
data_path: &[PathBuf],
labels_path: &[PathBuf],
ram: bool,
) -> PointCloudResult<PointCloud<M>> {
if data_path.len() != labels_path.len() {
panic!(
"Mismatch of label and data paths Data: {:?}, Labels: {:?}",
data_path, labels_path
);
}
let mut addresses = IndexMap::new();
let mut names_to_indexes: IndexMap<PointName, PointIndex> = IndexMap::new();
let mut indexes_to_names: IndexMap<PointIndex, PointName> = IndexMap::new();
let mut current_count: u64 = 0;
let mut data_sources = Vec::new();
let mut label_sources = Vec::new();
for (i,(dp,lp)) in data_path.iter().zip(labels_path).enumerate() {
let new_data: Box<dyn DataSource>;
if ram {
new_data = Box::new((DataMemmap::new(data_dim, &dp)?).convert_to_ram());
} else {
new_data = Box::new(DataMemmap::new(data_dim, &dp)?);
}
let new_labels = labels_scheme.open(&lp)?;
if new_data.len() != new_labels.len() {
panic!("The data count {:?} differs from the label count {:?} for the {}th data and label files", new_data.len(), new_labels.len(), i);
}
for j in 0..new_data.len() {
let x = (i, j);
let name = new_labels
.get_name(j)
.unwrap_or_else(|| format!("{}", current_count));
if names_to_indexes.contains_key(&name) {
println!(
"Duplicate {:?} on line {} of file {:?}",
&name, j, labels_path[i]
);
} else {
names_to_indexes.insert(name.clone(), current_count);
indexes_to_names.insert(current_count, name.clone());
addresses.insert(current_count, x);
}
current_count += 1;
}
data_sources.push(new_data);
label_sources.push(new_labels);
}
let chunk = min(15000/data_dim,20);
Ok(PointCloud {
data_sources: data_sources,
label_sources: label_sources,
names_to_indexes: names_to_indexes,
indexes_to_names: indexes_to_names,
addresses: addresses,
data_dim,
labels_scheme,
loaded_centers: Mutex::new(IndexMap::new()),
chunk,
metric: PhantomData,
})
}
pub fn from_ram(
data: Box<[f32]>,
data_dim: usize,
labels: MetadataList,
) -> PointCloudResult<PointCloud<M>> {
let mut addresses = IndexMap::new();
let data_source = Box::new(DataRam::new(data_dim, data)?);
let labels_scheme = labels.scheme()?;
let label_source = labels;
let mut names_to_indexes: IndexMap<PointName, PointIndex> = IndexMap::new();
let mut indexes_to_names: IndexMap<PointIndex, PointName> = IndexMap::new();
for j in 0..(data_source.len()) {
let name = label_source.get_name(j).unwrap_or_else(|| format!("{}", j));
if names_to_indexes.contains_key(&name) {
println!("Duplicate {:?} on line {} of file", &name, j);
} else {
names_to_indexes.insert(name.clone(), j as PointIndex);
indexes_to_names.insert(j as PointIndex, name.clone());
addresses.insert(j as u64, (0,j));
}
}
let chunk = min(15000/data_dim,20);
Ok(PointCloud {
data_sources: vec![data_source],
label_sources: vec![label_source],
names_to_indexes: names_to_indexes,
indexes_to_names: indexes_to_names,
addresses: addresses,
data_dim,
loaded_centers: Mutex::new(IndexMap::new()),
labels_scheme,
chunk,
metric: PhantomData,
})
}
pub fn from_yaml(params: &Yaml) -> PointCloudResult<PointCloud<M>> {
let data_paths = &get_file_list(
params["data_path"]
.as_str()
.expect("Unable to read the 'labels_path'"),
);
let labels_paths = &get_file_list(
params["labels_path"]
.as_str()
.expect("Unable to read the 'labels_path'"),
);
let data_dim = params["data_dim"]
.as_i64()
.expect("Unable to read the 'data_dim'") as usize;
let mut deser = LabelScheme::new();
if params["schema"].is_badvalue() {
let labels_dim = params["labels_dim"]
.as_i64()
.expect("Unable to read the 'labels_dim' or the 'schema'")
as usize;
deser.add_vector("y".to_string(), labels_dim, "f32");
} else {
build_label_schema_yaml(&mut deser, ¶ms["schema"]);
}
let ram_bool = match params["in_ram"].as_bool() {
Some(b) => b,
None => true,
};
PointCloud::<M>::from_memmap_files(data_dim, deser, data_paths, labels_paths, ram_bool)
}
pub fn from_file<P: AsRef<Path>>(path: P) -> PointCloudResult<PointCloud<M>> {
let mut config_file =
File::open(&path).expect(&format!("Unable to read config file {:?}", &path.as_ref()));
let mut config = String::new();
config_file
.read_to_string(&mut config)
.expect(&format!("Unable to read config file {:?}", &path.as_ref()));
let params_files = YamlLoader::load_from_str(&config).unwrap();
PointCloud::<M>::from_yaml(¶ms_files[0])
}
pub fn simple_from_ram(
data: Box<[f32]>,
data_dim: usize,
labels: Box<[f32]>,
labels_dim: usize,
) -> PointCloudResult<PointCloud<M>> {
assert!(data.len() / data_dim == labels.len() / labels_dim);
let list = MetadataList::simple_vec(labels, labels_dim);
PointCloud::<M>::from_ram(data, data_dim, list)
}
pub fn len(&self) -> usize {
self.data_sources.iter().fold(0, |acc, mm| acc + mm.len())
}
pub fn dim(&self) -> usize {
self.data_dim
}
pub fn reference_indexes(&self) -> Vec<PointIndex> {
self.addresses.keys().cloned().collect()
}
pub fn get_center(&self, pn: PointIndex) -> PointCloudResult<Arc<Vec<f32>>> {
let mut loaded_centers = self.loaded_centers.lock().unwrap();
Ok(Arc::clone(
loaded_centers
.entry(pn)
.or_insert(Arc::new(Vec::from(self.get_point(pn)?))),
))
}
#[inline]
fn get_address(&self,pn: PointIndex) -> PointCloudResult<(usize,usize)> {
match self.addresses.get(&pn) {
Some((i, j)) => Ok((*i,*j)),
None => panic!("Index not found"),
}
}
pub fn get_point(&self, pn: PointIndex) -> PointCloudResult<&[f32]> {
let (i,j) = self.get_address(pn)?;
self.data_sources[i].get(j)
}
pub fn get_name(&self, pi: &PointIndex) -> Option<&PointName> {
self.indexes_to_names.get(pi)
}
pub fn get_index(&self, pn: &PointName) -> Option<&PointIndex> {
self.names_to_indexes.get(pn)
}
pub fn get_names(&self) -> Vec<PointName> {
self.names_to_indexes.keys().cloned().collect()
}
pub fn schema_json(&self) -> String {
self.labels_scheme.schema_json()
}
pub fn get_metadata(&self, pn: PointIndex) -> PointCloudResult<Metadata> {
let (i,j) = self.get_address(pn)?;
self.label_sources[i].get(j)
}
pub fn get_metasummary(&self, pns: &[PointIndex]) -> PointCloudResult<MetaSummary> {
let mut disk_splits: Vec<Vec<usize>> = vec![Vec::new(); self.label_sources.len()];
for pn in pns.iter() {
let (i,j) = self.get_address(*pn)?;
disk_splits[i].push(j);
}
let disk_summaries: Vec<MetaSummary> = disk_splits
.iter()
.enumerate()
.map(|(i, indexes)| self.label_sources[i].get_summary(indexes).unwrap())
.collect();
MetaSummary::combine(&disk_summaries)
}
pub fn distances_to_point_indices(
&self,
is: &[PointIndex],
js: &[PointIndex],
) -> PointCloudResult<Vec<f32>> {
let mut dists: Vec<f32> = vec![0.0;is.len()*js.len()];
if is.len()*js.len() > self.chunk {
let dist_iter = dists.par_chunks_mut(js.len());
let indexes_iter = is.par_iter().map(|i| (i,js));
let error: Mutex<Result<(), PointCloudError>> = Mutex::new(Ok(()));
dist_iter.zip(indexes_iter).for_each(|(chunk_dists,(i,chunk_indexes))| {
match self.get_point(*i) {
Ok(x) => {
for (d,j) in chunk_dists.iter_mut().zip(chunk_indexes) {
match self.get_point(*j) {
Ok(y) => *d = (M::dense)(x, y),
Err(e) => {
*error.lock().unwrap() = Err(e);
}
}
}
},
Err(e) => {
*error.lock().unwrap() = Err(e);
}
};
});
(error.into_inner().unwrap())?;
} else {
for (k,i) in is.iter().enumerate() {
let x = self.get_point(*i)?;
for (l,j) in js.iter().enumerate() {
let y = self.get_point(*j)?;
dists[k*js.len() + l] = (M::dense)(x, y);
}
}
}
Ok(dists)
}
pub fn distances_to_point_index(
&self,
i: PointIndex,
indexes: &[PointIndex],
) -> PointCloudResult<Vec<f32>> {
self.distances_to_point(self.get_point(i)?,indexes)
}
pub fn adj(&self,
mut indexes: &[PointIndex],
) -> PointCloudResult<AdjMatrix> {
let mut vals = HashMap::new();
while indexes.len() > 1 {
let i = indexes[0];
indexes = &indexes[1..];
let distances = self.distances_to_point_index(i, &indexes)?;
indexes.iter().zip(distances).for_each(|(j, d)| {
if i < *j {
vals.insert((i, *j), d);
} else {
vals.insert((*j, i), d);
}
});
}
Ok(AdjMatrix { vals })
}
pub fn distances_to_point(
&self,
x: &[f32],
indexes: &[PointIndex],
) -> PointCloudResult<Vec<f32>> {
let len = indexes.len();
if len > self.chunk * 3 {
let mut dists: Vec<f32> = vec![0.0;len];
let dist_iter = dists.par_chunks_mut(self.chunk);
let indexes_iter = indexes.par_chunks(self.chunk);
let error: Mutex<Result<(), PointCloudError>> = Mutex::new(Ok(()));
dist_iter.zip(indexes_iter).for_each(|(chunk_dists,chunk_indexes)| {
for (d,i) in chunk_dists.iter_mut().zip(chunk_indexes) {
match self.get_point(*i) {
Ok(y) => *d = (M::dense)(x, y),
Err(e) => {
*error.lock().unwrap() = Err(e);
}
}
}
});
(error.into_inner().unwrap())?;
Ok(dists)
} else {
indexes
.iter()
.map(|i| {
let y = self.get_point(*i)?;
Ok((M::dense)(x, y))
})
.collect()
}
}
}
fn build_label_schema_yaml(label_scheme: &mut LabelScheme, schema_yaml: &Yaml) {
if let Some(schema_map) = schema_yaml.as_hash() {
for (k, v) in schema_map.iter() {
let key = k.as_str().unwrap().to_string();
match v.as_str().unwrap() {
"u32" => label_scheme.add_u32(key),
"f32" => label_scheme.add_f32(key),
"i32" => label_scheme.add_i32(key),
"bool" => label_scheme.add_bool(key),
"string" => label_scheme.add_string(key),
"name" => label_scheme.add_name_column(&key),
_ => panic!(
"Unknown type in schema yaml, also it should be (VALUE: TYPE): {:?}",
(k, v)
),
}
}
} else {
panic!("Need to correctly edit the yaml");
}
}
fn get_file_list(files_reg: &str) -> Vec<PathBuf> {
let options = MatchOptions {
case_sensitive: false,
..Default::default()
};
let mut paths = Vec::new();
let glob_paths = match glob_with(files_reg, &options) {
Ok(expr) => expr,
Err(e) => panic!("Pattern reading error {:?}", e),
};
for entry in glob_paths {
let path = match entry {
Ok(expr) => expr,
Err(e) => panic!("Error reading path {:?}", e),
};
paths.push(path)
}
paths
}