use crate::clustering::{Centers, Clustering};
use crate::space::{ColoredMetric, Point};
use crate::types::{CenterIdx, Distance, PointCount};
mod buckets;
use buckets::put_into_buckets;
mod flow;
use flow::{add_edge, initialize_state};
mod settle;
use settle::settle;
#[cfg(debug_assertions)]
use std::time;
#[cfg(debug_assertions)]
pub(crate) mod with_sorting;
use std::collections::HashMap;
use std::collections::VecDeque;
type EdgeIdx = usize;
type BucketIdx = usize;
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq)]
pub(crate) struct Edge<'a> {
pub d: Distance,
left: CenterIdx, right: &'a Point,
}
struct PendingQueues<'a> {
queues: HashMap<(CenterIdx, BucketIdx), VecDeque<Edge<'a>>>,
}
impl<'a> PendingQueues<'a> {
fn new() -> PendingQueues<'a> {
PendingQueues {
queues: HashMap::new(),
}
}
fn push(&mut self, c: CenterIdx, b: BucketIdx, edge: Edge<'a>) {
match self.queues.get_mut(&(c, b)) {
Some(queue) => queue.push_back(edge),
None => {
self.queues.insert((c, b), VecDeque::from(vec![edge]));
}
}
}
fn pop(&mut self, c: CenterIdx, b: BucketIdx) -> Option<Edge<'a>> {
match self.queues.get_mut(&(c, b)) {
Some(queue) => queue.pop_front(),
None => None,
}
}
fn is_empty(&self, c: CenterIdx, b: BucketIdx) -> bool {
match self.queues.get(&(c, b)) {
Some(queue) => queue.is_empty(),
None => true,
}
}
fn clear(&mut self, c: CenterIdx, b: BucketIdx) {
if let Some(queue) = self.queues.get_mut(&(c, b)) {
queue.clear()
}
}
}
pub(crate) fn make_private<M: ColoredMetric>(
space: &M,
privacy_bound: PointCount,
centers: &Centers,
thread_count: usize,
) -> Vec<Clustering> {
#[cfg(debug_assertions)]
println!(" - Phase 2 with buckets:");
#[cfg(debug_assertions)]
let time_start_make_private = time::Instant::now();
let k = centers.m();
let mut edges: Vec<Edge> = Vec::with_capacity(k * space.n());
for (j, c) in centers.get_all(space).iter().enumerate() {
for p in space.point_iter() {
edges.push(Edge {
d: space.dist(c, p),
left: j as CenterIdx,
right: p,
});
}
}
#[cfg(debug_assertions)]
let time_end_edge_creation = time::Instant::now();
#[cfg(debug_assertions)]
println!(
" - creation of {} edges takes: {:?}.",
space.n() * k,
time_end_edge_creation.duration_since(time_start_make_private)
);
let power_of_k: u32 = 2;
let mut buckets = put_into_buckets(&mut edges, space.n(), k, power_of_k, thread_count);
#[cfg(debug_assertions)]
let time_after_buckets = time::Instant::now();
#[cfg(debug_assertions)]
println!(
" - putting {} edges into {} bucket with {} threads takes: {:?}.",
space.n() * k,
buckets.len(),
thread_count,
time_after_buckets.duration_since(time_end_edge_creation)
);
let mut clusterings: Vec<Clustering> = Vec::with_capacity(k);
let mut pending = PendingQueues::new();
let mut state = initialize_state(space.n(), k, privacy_bound == 0);
let mut i: CenterIdx = 0; let mut j = 0; let mut edge_cursor: EdgeIdx = 0;
while i < k {
debug_assert!(j < buckets.len());
for b in 0..j {
while !pending.is_empty(i, b) {
let e = pending.pop(i, b).unwrap();
debug_assert_eq!(i, e.left); add_edge(e, i, k, privacy_bound, &mut state);
}
}
while state.max_flow < (i + 1) * privacy_bound {
debug_assert!(
j < buckets.len(),
"For i = {} no flow of value (i + 1) * privacy_bound = {} found! Panic!",
i,
(i + 1) * privacy_bound
);
if edge_cursor >= buckets[j].len() {
j += 1;
debug_assert!(
j < buckets.len(),
"All buckets have been processed but still not all radii have been settled!"
);
edge_cursor = 0;
continue; }
let e = buckets[j][edge_cursor];
let c = e.left; if c > i {
pending.push(c, j, e);
} else {
add_edge(e, i, k, privacy_bound, &mut state);
}
edge_cursor += 1;
}
clusterings.push(settle(
edge_cursor,
buckets[j],
i,
privacy_bound,
&mut state,
centers,
space,
));
edge_cursor = 0;
for c in i..k {
pending.clear(c, j);
}
i += 1;
}
#[cfg(debug_assertions)]
let time_after_flow = time::Instant::now();
#[cfg(debug_assertions)]
println!(
" - solving flow problems and settle all {} centers takes: {:?}.",
k,
time_after_flow.duration_since(time_after_buckets)
);
for clustering in clusterings.iter_mut() {
clustering.fill_up(space);
}
#[cfg(debug_assertions)]
let time_after_filling_up = time::Instant::now();
#[cfg(debug_assertions)]
println!(
" - filling up all remaining points takes: {:?}.",
time_after_filling_up.duration_since(time_after_flow)
);
clusterings
}