coreset/
clustercore.rs

1//! This module provides a simple user interface for clustering data via a coreset.  
2//! The data must be accessed via an iterator, see [makeiter](super::makeiter).  
3//! It chains the bmor,coreset. Then the coreset is clustered with kmedoid algorithms and ends with a pass dispatching all data
4//! to their nearest coreset clustering center, recomputing global
5//! cost and storing membership
6//!
7//! Clusters are dumped in a csv file name "clustercoreset.csv".  
8//! Each line consists in the DataId of an item, the DataId of its cluster center
9//!
10
11use std::collections::HashMap;
12use std::hash::Hash;
13
14use rayon::iter::{IntoParallelIterator, ParallelIterator};
15
16use cpu_time::ProcessTime;
17use num_cpus;
18use std::time::SystemTime;
19
20use std::io::Write;
21
22use anndists::dist::*;
23
24use crate::sensitivity::*;
25// use crate::facility::*;
26use crate::makeiter::*;
27use crate::wkmedian::*;
28
29/// Bmor parameters driving the coreset construction
30/// See [Bmor](super::bmor::Bmor)
31#[derive(Copy, Clone)]
32pub struct BmorArg {
33    nb_data_expected: usize,
34    /// drives upper cost acceptable through iterations. 2. is a standard default.
35    beta: f64,
36    //
37    gamma: f64,
38}
39
40impl BmorArg {
41    /// nb_data_expected : number of data expected
42    pub fn new(nb_data_expected: usize, beta: f64, gamma: f64) -> Self {
43        BmorArg {
44            nb_data_expected,
45            beta,
46            gamma,
47        }
48    }
49}
50
51impl Default for BmorArg {
52    fn default() -> Self {
53        BmorArg {
54            nb_data_expected: 1_000_000,
55            beta: 2.,
56            gamma: 2.,
57        }
58    }
59}
60
61//==================================================================
62
63pub struct ClusterCoreset<DataId: std::fmt::Debug + Eq + std::hash::Hash + Clone + Send + Sync, T> {
64    //
65    nb_cluster: usize,
66    /// fraction of data to keep in coreset
67    fraction: f64,
68    //
69    bmor_arg: BmorArg,
70    // total nb data processed
71    nb_data: usize,
72    /// To store kmedoid result
73    kmedoids: Option<Kmedoid<DataId, T>>,
74    /// associate each DataId to its medoid rank (computed by function dispatch)
75    ids_to_cluster: Option<HashMap<DataId, DataId>>,
76}
77
78impl<DataId, T> ClusterCoreset<DataId, T>
79where
80    DataId: Default + Eq + Hash + Send + Sync + Clone + std::fmt::Debug,
81    T: Clone + Send + Sync + std::fmt::Debug,
82{
83    /// - nb_cluster
84    /// - fraction : fraction of data to keep in coreset.  
85    ///   The fraction is chosen so that
86    ///   the subsequent k-medoids phase is computationally tractable and yet sufficient to
87    ///   represent the whole data. (Typically for Mnist data ~0.1 is a valid choice)
88    /// - bmor_arg : defines parameter to Bmor pass
89    pub fn new(nb_cluster: usize, fraction: f64, bmor_arg: BmorArg) -> Self {
90        ClusterCoreset {
91            nb_cluster,
92            fraction,
93            bmor_arg,
94            nb_data: 0,
95            kmedoids: None,
96            ids_to_cluster: None,
97        }
98    }
99
100    /// computes coreset and kmedoid clustering.  
101    /// - distance : the metric to use
102    /// - nb_iter : the maximal number of iterations in kmedoid.  
103    ///    
104    /// **Note:**  
105    /// Just the DataId of the center are stored in kmedoid structure, not the Data vector to spare memory.  
106    /// To extract the data vectors, call [dispatch](Self::dispatch()) which retrive the data vectors and computes the clustering cost with respect to the whole data.
107    /// It needs one more pass on the data.
108    pub fn compute<Dist, IterProducer>(
109        &mut self,
110        distance: Dist,
111        nb_iter: usize,
112        iter_producer: &IterProducer,
113    ) where
114        Dist: Distance<T> + Send + Sync + Clone,
115        IterProducer: MakeIter<Item = (DataId, Vec<T>)>,
116    {
117        //
118        let cpu_start = ProcessTime::now();
119        let sys_now = SystemTime::now();
120        //
121        let mut coreset1 = Coreset1::<DataId, T, Dist>::new(
122            self.nb_cluster,
123            self.bmor_arg.nb_data_expected,
124            self.bmor_arg.beta,
125            self.bmor_arg.gamma,
126            distance.clone(),
127        );
128        //
129        let result = coreset1.make_coreset(iter_producer, self.fraction);
130        log::info!(
131            "make_coreset done sys time {}, cpu time {}",
132            sys_now.elapsed().unwrap().as_millis(),
133            cpu_start.elapsed().as_millis()
134        );
135        if result.is_err() {
136            log::error!("construction of coreset1 failed");
137        }
138        let coreset = result.unwrap();
139        log::info!("coreset1 nb different points : {}", coreset.get_nb_points());
140        //
141        log::info!(
142            "\n\n doing kmedoid clustering using distance : {}",
143            std::any::type_name::<Dist>()
144        );
145        let nb_cluster = self.nb_cluster;
146        let mut kmedoids = Kmedoid::new(&coreset, nb_cluster);
147        let (nb_iter, cost) = kmedoids.compute_medians(nb_iter);
148        // TODO: we have coreset and kmedoids we must store center (Vec<T>) of each medoid!
149        self.nb_data = coreset1.get_nb_data();
150        //
151        log::info!(
152            " kmedoids finished at nb_iter : {}, cost = {:.3e}",
153            nb_iter,
154            cost
155        );
156        log::info!(
157            " ClusterCoreset::compute (coreset+kmedoids sys time(ms) {:?} cpu time(ms) {:?}",
158            sys_now.elapsed().unwrap().as_millis(),
159            cpu_start.elapsed().as_millis()
160        );
161        //
162        self.kmedoids = Some(kmedoids);
163    } // end of compute
164
165    //
166
167    /// Once you have Kmedoid, you can compute the clustering cost for the whole data, not just the coreset.
168    /// This function can also fill in  [Kmedoid] structure the data vector associated to each center, see [Kmedoid::get_cluster_center]
169    pub fn dispatch<Dist, IterProducer>(&mut self, distance: &Dist, iter_producer: &IterProducer)
170    where
171        T: Send + Sync + Clone,
172        Dist: Distance<T> + Send + Sync + Clone,
173        IterProducer: MakeIter<Item = (DataId, Vec<T>)>,
174    {
175        //
176        let cpu_start = ProcessTime::now();
177        let sys_now = SystemTime::now();
178        //
179        // We must bufferize and make distance computation
180        //
181        let mut data_iter = iter_producer.makeiter();
182        let nb_cpus = num_cpus::get();
183        let buffer_size = 5000 * nb_cpus;
184        let mut map_to_medoid = HashMap::<DataId, DataId>::with_capacity(self.nb_data);
185        // We must retrive datas corresponding to medoid centers
186        self.get_kmedoids().retrieve_cluster_centers(iter_producer);
187        let centers = self.kmedoids.as_ref().unwrap().get_centers().unwrap();
188        if centers.is_empty() {
189            log::error!("ClusterCore::dispatch, kmedoids centers have not yet been computed");
190            std::process::exit(1);
191        }
192        //
193        // This function returns for each data (id,data) a triplet (id, rank of nearest center found and distance to its cluster center)
194        //
195        let dispatch_i = |(id, data): (DataId, &Vec<T>)| -> (DataId, usize, f32) {
196            // get nearest medoid. We can retrieve Vec<T> for each medoid from coreset , so we must get access to it
197            assert!(!data.is_empty());
198            let dists: Vec<f32> = centers.iter().map(|c| distance.eval(data, c)).collect();
199            let mut dmin = f32::MAX;
200            let mut imin = usize::MAX;
201            for (i, d) in dists.iter().enumerate() {
202                if *d < dmin {
203                    dmin = *d;
204                    imin = i;
205                }
206            }
207            if imin >= dists.len() {
208                log::error!("\n dispatch failed for id {:?}, FATAL EXITING", id);
209                std::process::exit(1);
210            }
211            //
212            (id, imin, dmin)
213        };
214        let mut dispatching_cost: f64 = 0.;
215        let mut nb_total_data = 0usize;
216        //
217        loop {
218            let buffres = self.get_buffer_data(buffer_size, &mut data_iter);
219            if buffres.is_err() {
220                break;
221            }
222            let ids_datas = buffres.unwrap();
223            nb_total_data += ids_datas.len();
224            // dispatch buffer
225            let res_dispatch: Vec<(DataId, usize, f32)> = ids_datas
226                .into_par_iter()
227                .map(|(i, d)| dispatch_i((i, &d)))
228                .collect();
229            for (id, cluster_rank, d) in res_dispatch {
230                let c_id_res = self.kmedoids.as_ref().unwrap().get_center_id(cluster_rank);
231                if c_id_res.is_err() {
232                    log::error!("cannot get center of cluster n° : {}", cluster_rank);
233                    std::process::exit(1);
234                }
235                let c_id = c_id_res.unwrap();
236                map_to_medoid.insert(id, c_id);
237                dispatching_cost += d as f64;
238            }
239        }
240        println!(
241            "\n end of data dispatching dispatching all data to their cluster, global cost : {:.3e}, cost by data : {:.3e}",
242            dispatching_cost,
243            dispatching_cost/ nb_total_data as f64
244        );
245        //
246        // dump clusters DataId info
247        //
248        self.ids_to_cluster = Some(map_to_medoid);
249        let _ = self.dump_clusters();
250        //
251        log::info!(
252            "\n  ClusterCoreset::dispatch sys time(ms) {:?} cpu time(ms) {:?}",
253            sys_now.elapsed().unwrap().as_millis(),
254            cpu_start.elapsed().as_millis()
255        );
256    } // end of dispatch
257
258    //
259
260    /// Dumps in file for each dataId, the DataId of corresponding cluster center. If Ok returns number of record dumped.  
261    /// The dump is a csv file whose name is *clustercoreset-pid.csv* where pid is the pid of the process.
262    /// Each line consists in 2 DataId, the first one identifies a data point and the second the DataId of the center of its corresponding cluster.  
263    /// This function requires [dispatch][Self::dispatch()] to have been called previously
264    pub fn dump_clusters(&self) -> anyhow::Result<usize> {
265        //
266        let mut name = String::from("clustercoreset");
267        name.push_str(".csv");
268        let file = std::fs::File::create(&name)?;
269        let mut bufw = std::io::BufWriter::new(file);
270        let mut nb_record = 0;
271        //
272        if self.ids_to_cluster.is_none() {
273            log::error!(
274                "ClusterCorest::dump_clusters: The method dispatch should have been alled before"
275            );
276            return Err(anyhow::anyhow!(
277                "ClusterCorest::dump_clusters: The method dispatch should have been alled before"
278            ));
279        }
280        for (d, c) in self.ids_to_cluster.as_ref().unwrap() {
281            writeln!(bufw, "{:?},{:?}\n", d, c).unwrap();
282            nb_record += 1;
283        }
284        bufw.flush().unwrap();
285        log::info!(
286            "clustercorest, dumping cluster info in file {:?} , nb_record : {:?} ",
287            name,
288            nb_record
289        );
290        //
291        Ok(nb_record)
292    }
293
294    /// use iterator to return a block of data
295    fn get_buffer_data(
296        &self,
297        buffer_size: usize,
298        data_iter: &mut impl Iterator<Item = (DataId, Vec<T>)>,
299    ) -> Result<Vec<(DataId, Vec<T>)>, u32> {
300        //
301        let mut ids_datas = Vec::<(DataId, Vec<T>)>::with_capacity(buffer_size);
302        //
303        loop {
304            let data_opt = data_iter.next();
305            match data_opt {
306                Some((id, data)) => {
307                    // insert
308                    ids_datas.push((id, data));
309                    if ids_datas.len() == buffer_size {
310                        break;
311                    }
312                }
313                _ => {
314                    break;
315                }
316            } // end ma
317        } // end loop
318          //
319        if !ids_datas.is_empty() {
320            Ok(ids_datas)
321        } else {
322            Err(0)
323        }
324    } // end of get_buffer_data
325
326    fn get_kmedoids(&mut self) -> &mut Kmedoid<DataId, T> {
327        self.kmedoids.as_mut().unwrap()
328    }
329} // end of impl ClusterCorese