1use std::collections::HashMap;
12use std::hash::Hash;
13
14use rayon::iter::{IntoParallelIterator, ParallelIterator};
15
16use cpu_time::ProcessTime;
17use num_cpus;
18use std::time::SystemTime;
19
20use std::io::Write;
21
22use anndists::dist::*;
23
24use crate::sensitivity::*;
25use crate::makeiter::*;
27use crate::wkmedian::*;
28
29#[derive(Copy, Clone)]
32pub struct BmorArg {
33 nb_data_expected: usize,
34 beta: f64,
36 gamma: f64,
38}
39
40impl BmorArg {
41 pub fn new(nb_data_expected: usize, beta: f64, gamma: f64) -> Self {
43 BmorArg {
44 nb_data_expected,
45 beta,
46 gamma,
47 }
48 }
49}
50
51impl Default for BmorArg {
52 fn default() -> Self {
53 BmorArg {
54 nb_data_expected: 1_000_000,
55 beta: 2.,
56 gamma: 2.,
57 }
58 }
59}
60
61pub struct ClusterCoreset<DataId: std::fmt::Debug + Eq + std::hash::Hash + Clone + Send + Sync, T> {
64 nb_cluster: usize,
66 fraction: f64,
68 bmor_arg: BmorArg,
70 nb_data: usize,
72 kmedoids: Option<Kmedoid<DataId, T>>,
74 ids_to_cluster: Option<HashMap<DataId, DataId>>,
76}
77
78impl<DataId, T> ClusterCoreset<DataId, T>
79where
80 DataId: Default + Eq + Hash + Send + Sync + Clone + std::fmt::Debug,
81 T: Clone + Send + Sync + std::fmt::Debug,
82{
83 pub fn new(nb_cluster: usize, fraction: f64, bmor_arg: BmorArg) -> Self {
90 ClusterCoreset {
91 nb_cluster,
92 fraction,
93 bmor_arg,
94 nb_data: 0,
95 kmedoids: None,
96 ids_to_cluster: None,
97 }
98 }
99
100 pub fn compute<Dist, IterProducer>(
109 &mut self,
110 distance: Dist,
111 nb_iter: usize,
112 iter_producer: &IterProducer,
113 ) where
114 Dist: Distance<T> + Send + Sync + Clone,
115 IterProducer: MakeIter<Item = (DataId, Vec<T>)>,
116 {
117 let cpu_start = ProcessTime::now();
119 let sys_now = SystemTime::now();
120 let mut coreset1 = Coreset1::<DataId, T, Dist>::new(
122 self.nb_cluster,
123 self.bmor_arg.nb_data_expected,
124 self.bmor_arg.beta,
125 self.bmor_arg.gamma,
126 distance.clone(),
127 );
128 let result = coreset1.make_coreset(iter_producer, self.fraction);
130 log::info!(
131 "make_coreset done sys time {}, cpu time {}",
132 sys_now.elapsed().unwrap().as_millis(),
133 cpu_start.elapsed().as_millis()
134 );
135 if result.is_err() {
136 log::error!("construction of coreset1 failed");
137 }
138 let coreset = result.unwrap();
139 log::info!("coreset1 nb different points : {}", coreset.get_nb_points());
140 log::info!(
142 "\n\n doing kmedoid clustering using distance : {}",
143 std::any::type_name::<Dist>()
144 );
145 let nb_cluster = self.nb_cluster;
146 let mut kmedoids = Kmedoid::new(&coreset, nb_cluster);
147 let (nb_iter, cost) = kmedoids.compute_medians(nb_iter);
148 self.nb_data = coreset1.get_nb_data();
150 log::info!(
152 " kmedoids finished at nb_iter : {}, cost = {:.3e}",
153 nb_iter,
154 cost
155 );
156 log::info!(
157 " ClusterCoreset::compute (coreset+kmedoids sys time(ms) {:?} cpu time(ms) {:?}",
158 sys_now.elapsed().unwrap().as_millis(),
159 cpu_start.elapsed().as_millis()
160 );
161 self.kmedoids = Some(kmedoids);
163 } pub fn dispatch<Dist, IterProducer>(&mut self, distance: &Dist, iter_producer: &IterProducer)
170 where
171 T: Send + Sync + Clone,
172 Dist: Distance<T> + Send + Sync + Clone,
173 IterProducer: MakeIter<Item = (DataId, Vec<T>)>,
174 {
175 let cpu_start = ProcessTime::now();
177 let sys_now = SystemTime::now();
178 let mut data_iter = iter_producer.makeiter();
182 let nb_cpus = num_cpus::get();
183 let buffer_size = 5000 * nb_cpus;
184 let mut map_to_medoid = HashMap::<DataId, DataId>::with_capacity(self.nb_data);
185 self.get_kmedoids().retrieve_cluster_centers(iter_producer);
187 let centers = self.kmedoids.as_ref().unwrap().get_centers().unwrap();
188 if centers.is_empty() {
189 log::error!("ClusterCore::dispatch, kmedoids centers have not yet been computed");
190 std::process::exit(1);
191 }
192 let dispatch_i = |(id, data): (DataId, &Vec<T>)| -> (DataId, usize, f32) {
196 assert!(!data.is_empty());
198 let dists: Vec<f32> = centers.iter().map(|c| distance.eval(data, c)).collect();
199 let mut dmin = f32::MAX;
200 let mut imin = usize::MAX;
201 for (i, d) in dists.iter().enumerate() {
202 if *d < dmin {
203 dmin = *d;
204 imin = i;
205 }
206 }
207 if imin >= dists.len() {
208 log::error!("\n dispatch failed for id {:?}, FATAL EXITING", id);
209 std::process::exit(1);
210 }
211 (id, imin, dmin)
213 };
214 let mut dispatching_cost: f64 = 0.;
215 let mut nb_total_data = 0usize;
216 loop {
218 let buffres = self.get_buffer_data(buffer_size, &mut data_iter);
219 if buffres.is_err() {
220 break;
221 }
222 let ids_datas = buffres.unwrap();
223 nb_total_data += ids_datas.len();
224 let res_dispatch: Vec<(DataId, usize, f32)> = ids_datas
226 .into_par_iter()
227 .map(|(i, d)| dispatch_i((i, &d)))
228 .collect();
229 for (id, cluster_rank, d) in res_dispatch {
230 let c_id_res = self.kmedoids.as_ref().unwrap().get_center_id(cluster_rank);
231 if c_id_res.is_err() {
232 log::error!("cannot get center of cluster n° : {}", cluster_rank);
233 std::process::exit(1);
234 }
235 let c_id = c_id_res.unwrap();
236 map_to_medoid.insert(id, c_id);
237 dispatching_cost += d as f64;
238 }
239 }
240 println!(
241 "\n end of data dispatching dispatching all data to their cluster, global cost : {:.3e}, cost by data : {:.3e}",
242 dispatching_cost,
243 dispatching_cost/ nb_total_data as f64
244 );
245 self.ids_to_cluster = Some(map_to_medoid);
249 let _ = self.dump_clusters();
250 log::info!(
252 "\n ClusterCoreset::dispatch sys time(ms) {:?} cpu time(ms) {:?}",
253 sys_now.elapsed().unwrap().as_millis(),
254 cpu_start.elapsed().as_millis()
255 );
256 } pub fn dump_clusters(&self) -> anyhow::Result<usize> {
265 let mut name = String::from("clustercoreset");
267 name.push_str(".csv");
268 let file = std::fs::File::create(&name)?;
269 let mut bufw = std::io::BufWriter::new(file);
270 let mut nb_record = 0;
271 if self.ids_to_cluster.is_none() {
273 log::error!(
274 "ClusterCorest::dump_clusters: The method dispatch should have been alled before"
275 );
276 return Err(anyhow::anyhow!(
277 "ClusterCorest::dump_clusters: The method dispatch should have been alled before"
278 ));
279 }
280 for (d, c) in self.ids_to_cluster.as_ref().unwrap() {
281 writeln!(bufw, "{:?},{:?}\n", d, c).unwrap();
282 nb_record += 1;
283 }
284 bufw.flush().unwrap();
285 log::info!(
286 "clustercorest, dumping cluster info in file {:?} , nb_record : {:?} ",
287 name,
288 nb_record
289 );
290 Ok(nb_record)
292 }
293
294 fn get_buffer_data(
296 &self,
297 buffer_size: usize,
298 data_iter: &mut impl Iterator<Item = (DataId, Vec<T>)>,
299 ) -> Result<Vec<(DataId, Vec<T>)>, u32> {
300 let mut ids_datas = Vec::<(DataId, Vec<T>)>::with_capacity(buffer_size);
302 loop {
304 let data_opt = data_iter.next();
305 match data_opt {
306 Some((id, data)) => {
307 ids_datas.push((id, data));
309 if ids_datas.len() == buffer_size {
310 break;
311 }
312 }
313 _ => {
314 break;
315 }
316 } } if !ids_datas.is_empty() {
320 Ok(ids_datas)
321 } else {
322 Err(0)
323 }
324 } fn get_kmedoids(&mut self) -> &mut Kmedoid<DataId, T> {
327 self.kmedoids.as_mut().unwrap()
328 }
329}