#[cfg(feature = "parallel")]
use crate::arrayadapter::ArrayAdapter;
use crate::fasterpam::{update_removal_loss, update_second_nearest};
use crate::util::*;
use core::ops::AddAssign;
use ndarray::Array;
use num_traits::{Signed, Zero};
use rayon::prelude::*;
use std::convert::From;
use std::sync::{Arc, Mutex};
pub fn par_fasterpam<M, N, L>(
mat: &M,
med: &mut Vec<usize>,
maxiter: usize,
rng: &mut impl rand::Rng,
) -> (L, Vec<usize>, usize, usize)
where
N: Zero + PartialOrd + Copy + Sync + Send,
L: AddAssign + Signed + Zero + PartialOrd + Copy + From<N> + Sync + Send,
M: ArrayAdapter<N> + Sync + Send,
{
let (n, k) = (mat.len(), med.len());
if k == 1 {
let assi = vec![0; n];
let (swapped, loss) = choose_medoid_within_partition::<M, N, L>(mat, &assi, med, 0);
return (loss, assi, 1, if swapped { 1 } else { 0 });
}
let (mut loss, mut data) = par_initial_assignment(mat, med);
debug_assert_assignment(mat, med, &data);
let mut removal_loss = vec![L::zero(); k];
update_removal_loss(&data, &mut removal_loss);
let (mut lastswap, mut n_swaps, mut iter) = (n, 0, 0);
let seq = rand::seq::index::sample(rng, n, n); while iter < maxiter {
iter += 1;
let (swaps_before, lastloss) = (n_swaps, loss);
for j in seq.iter() {
if j == lastswap {
break;
}
if j == med[data[j].near.i as usize] {
continue; }
let (change, b) = par_find_best_swap(mat, &removal_loss, &data, j);
if change >= L::zero() {
continue; }
n_swaps += 1;
lastswap = j;
loss = par_do_swap(mat, med, &mut data, b, j);
update_removal_loss(&data, &mut removal_loss);
}
if n_swaps == swaps_before || loss >= lastloss {
break; }
}
let assi = data.iter().map(|x| x.near.i as usize).collect();
(loss, assi, iter, n_swaps)
}
#[inline]
fn par_initial_assignment<M, N, L>(mat: &M, med: &[usize]) -> (L, Vec<Rec<N>>)
where
N: Zero + PartialOrd + Copy + Send + Sync,
L: AddAssign + Zero + PartialOrd + Copy + From<N> + Send + Sync,
M: ArrayAdapter<N> + Sync,
{
let n = mat.len();
let k = med.len();
assert!(mat.is_square(), "Dissimilarity matrix is not square");
assert!(n <= u32::MAX as usize, "N is too large");
assert!(k > 0 && k < u32::MAX as usize, "invalid N");
assert!(k <= n, "k must be at most N");
let mut data = vec![Rec::<N>::empty(); mat.len()];
let firstcenter = med[0];
let loss = data
.par_iter_mut()
.enumerate()
.map(|(i, cur)| {
*cur = Rec::new(0, mat.get(i, firstcenter), u32::MAX, N::zero());
for (m, &me) in med.iter().enumerate().skip(1) {
let d = mat.get(i, me);
if d < cur.near.d || i == me {
cur.seco = cur.near;
cur.near = DistancePair { i: m as u32, d };
} else if cur.seco.i == u32::MAX || d < cur.seco.d {
cur.seco = DistancePair { i: m as u32, d };
}
}
L::from(cur.near.d)
})
.reduce_with(L::add)
.unwrap();
(loss, data)
}
#[inline]
fn par_find_best_swap<M, N, L>(mat: &M, removal_loss: &[L], data: &[Rec<N>], j: usize) -> (L, usize)
where
N: Zero + PartialOrd + Copy + Sync + Send,
L: AddAssign + Signed + Zero + PartialOrd + Copy + From<N> + Sync + Send,
M: ArrayAdapter<N> + Sync + Send,
{
let n = mat.len();
let length: usize = removal_loss.len();
let mut ploss = Array::from_vec(removal_loss.to_vec());
let mut acc = L::zero();
rayon::scope(|s| {
let parts = rayon::current_num_threads();
let stepsize = (n + parts - 1) / parts; let mutex = Arc::new(Mutex::new((&mut ploss, &mut acc)));
for x in 0..parts {
let mutex = Arc::clone(&mutex);
s.spawn(move |_| {
let mut loss = Array::zeros(length);
let mut lagg = L::zero();
let start = x * stepsize;
let end = usize::min(start + stepsize, n);
for o in start..end {
let reco = &data[o];
let doj = mat.get(o, j);
if doj < reco.near.d {
lagg += L::from(doj) - L::from(reco.near.d);
loss[reco.near.i as usize] += L::from(reco.near.d) - L::from(reco.seco.d);
} else if doj < reco.seco.d {
loss[reco.near.i as usize] += L::from(doj) - L::from(reco.seco.d);
}
}
let mut mutex = mutex.lock().unwrap();
*mutex.0 += &loss;
*mutex.1 += lagg;
})
}
});
let (b, bloss): (usize, L) = find_min(&mut ploss.iter());
(bloss + acc, b) }
#[inline]
fn par_do_swap<M, N, L>(
mat: &M,
med: &mut Vec<usize>,
data: &mut Vec<Rec<N>>,
b: usize,
j: usize,
) -> L
where
N: Zero + PartialOrd + Copy + Send + Sync,
L: AddAssign + Signed + Zero + PartialOrd + Copy + From<N> + Send + Sync,
M: ArrayAdapter<N> + Sync,
{
let n = mat.len();
assert!(b < med.len(), "invalid medoid number");
assert!(j < n, "invalid object number");
med[b] = j;
data.par_iter_mut()
.enumerate()
.map(|(o, reco)| {
if o == j {
if reco.near.i != b as u32 {
reco.seco = reco.near;
}
reco.near = DistancePair::new(b as u32, N::zero());
return L::zero();
}
let doj = mat.get(o, j);
if reco.near.i == b as u32 {
if doj < reco.seco.d {
reco.near = DistancePair::new(b as u32, doj);
} else {
reco.near = reco.seco;
reco.seco = update_second_nearest(mat, med, reco.near.i as usize, b, o, doj);
}
} else {
if doj < reco.near.d {
reco.seco = reco.near;
reco.near = DistancePair::new(b as u32, doj);
} else if reco.seco.i == b as u32 {
reco.seco = update_second_nearest(mat, med, reco.near.i as usize, b, o, doj);
} else if doj < reco.seco.d {
reco.seco = DistancePair::new(b as u32, doj);
}
}
L::from(reco.near.d)
})
.reduce_with(L::add)
.unwrap()
}
#[cfg(test)]
mod tests {
use crate::{arrayadapter::LowerTriangle, par_fasterpam, par_silhouette, util::assert_array};
use rand::{rngs::StdRng, SeedableRng};
#[test]
fn test_fasterpam_par() {
let data = LowerTriangle {
n: 5,
data: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 1],
};
let mut meds = vec![0, 1];
let mut rng = StdRng::seed_from_u64(1);
let (loss, assi, n_iter, n_swap): (i64, _, _, _) =
par_fasterpam(&data, &mut meds, 10, &mut rng);
let sil: f64 = par_silhouette(&data, &assi);
assert_eq!(loss, 4, "loss not as expected");
assert_eq!(n_swap, 1, "swaps not as expected");
assert_eq!(n_iter, 2, "iterations not as expected");
assert_array(assi, vec![0, 0, 0, 1, 1], "assignment not as expected");
assert_array(meds, vec![0, 4], "medoids not as expected");
assert_eq!(sil, 0.7522494172494172, "Silhouette not as expected");
}
}