#![allow(
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::many_single_char_names
)]
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use rayon::prelude::*;
use crate::Concurrency;
pub struct Corpus {
n: usize,
ndims: usize,
indptr: Vec<usize>,
dims: Vec<u32>,
wts: Vec<f64>,
maxw: Vec<f64>,
}
impl Corpus {
#[must_use]
pub fn len(&self) -> usize {
self.n
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.n == 0
}
#[must_use]
fn row(&self, i: usize) -> (&[u32], &[f64]) {
let (s, e) = (self.indptr[i], self.indptr[i + 1]);
(&self.dims[s..e], &self.wts[s..e])
}
#[cfg(all(target_os = "macos", feature = "gpu"))]
#[must_use]
pub fn csr_f32(&self) -> (Vec<u32>, Vec<u32>, Vec<f32>) {
let indptr = self.indptr.iter().map(|&x| x as u32).collect();
let wts = self.wts.iter().map(|&w| w as f32).collect();
(indptr, self.dims.clone(), wts)
}
#[must_use]
pub fn from_token_docs<S: AsRef<str>>(docs: &[Vec<S>]) -> Corpus {
let n = docs.len();
let mut dim: HashMap<&str, u32> = HashMap::new();
let mut df: Vec<u32> = Vec::new();
let mut doc_ids: Vec<Vec<u32>> = Vec::with_capacity(n);
for doc in docs {
let mut ids = Vec::with_capacity(doc.len());
let mut seen: HashSet<u32> = HashSet::new();
for tok in doc {
let id = *dim.entry(tok.as_ref()).or_insert_with(|| {
let i = df.len() as u32;
df.push(0);
i
});
ids.push(id);
if seen.insert(id) {
df[id as usize] += 1;
}
}
doc_ids.push(ids);
}
let idf: Vec<f64> = df.iter().map(|&d| (n as f64 / f64::from(d.max(1))).ln()).collect();
let rows: Vec<Vec<(u32, f64)>> = doc_ids
.iter()
.map(|ids| ids.iter().map(|&id| (id, idf[id as usize])).collect())
.collect();
Corpus::from_rows(&rows)
}
#[must_use]
pub fn from_rows(rows: &[Vec<(u32, f64)>]) -> Corpus {
let n = rows.len();
let normed: Vec<Vec<(u32, f64)>> = rows
.iter()
.map(|r| {
let mut m: HashMap<u32, f64> = HashMap::new();
for &(d, w) in r {
*m.entry(d).or_insert(0.0) += w;
}
let norm = m.values().map(|w| w * w).sum::<f64>().sqrt();
if norm > 0.0 {
m.into_iter().map(|(d, w)| (d, w / norm)).collect()
} else {
Vec::new()
}
})
.collect();
let mut maxw_orig: HashMap<u32, f64> = HashMap::new();
for v in &normed {
for &(d, w) in v {
let e = maxw_orig.entry(d).or_insert(0.0);
if w > *e {
*e = w;
}
}
}
let mut dims_sorted: Vec<(u32, f64)> = maxw_orig.into_iter().collect();
dims_sorted.sort_by(|a, b| {
a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal).then(a.0.cmp(&b.0))
});
let rank: HashMap<u32, u32> =
dims_sorted.iter().enumerate().map(|(i, &(d, _))| (d, i as u32)).collect();
let ndims = dims_sorted.len();
let maxw: Vec<f64> = dims_sorted.iter().map(|&(_, w)| w).collect();
let mut indptr = Vec::with_capacity(n + 1);
indptr.push(0);
let mut dims = Vec::new();
let mut wts = Vec::new();
for v in &normed {
let mut rv: Vec<(u32, f64)> = v.iter().map(|&(d, w)| (rank[&d], w)).collect();
rv.sort_unstable_by_key(|&(d, _)| d);
for (d, w) in rv {
dims.push(d);
wts.push(w);
}
indptr.push(dims.len());
}
Corpus { n, ndims, indptr, dims, wts, maxw }
}
}
#[must_use]
#[cfg_attr(feature = "profiling", inline(never))]
fn cos_full((da, wa): (&[u32], &[f64]), (db, wb): (&[u32], &[f64])) -> f64 {
debug_assert_eq!(da.len(), wa.len());
debug_assert_eq!(db.len(), wb.len());
let (la, lb) = (da.len(), db.len());
let (mut i, mut j) = (0usize, 0usize);
let mut s = 0.0f64;
while i < la && j < lb {
let (ai, bj) = unsafe { (*da.get_unchecked(i), *db.get_unchecked(j)) };
let (wai, wbj) = unsafe { (*wa.get_unchecked(i), *wb.get_unchecked(j)) };
let eq = f64::from(u32::from(ai == bj));
s += eq * wai * wbj;
i += usize::from(ai <= bj);
j += usize::from(ai >= bj);
}
s
}
#[derive(Clone, Copy)]
struct Bound {
pnorm: f64,
split: u32,
}
struct Cached {
bound: Vec<Bound>,
}
struct Scratch {
acc: Vec<f64>,
touched: Vec<u32>,
xpn: Vec<f64>,
}
#[must_use]
pub fn cosine_join(c: &Corpus, t: f64) -> Vec<(usize, usize, f64)> {
let n = c.n;
let mut index: Vec<Vec<(u32, f64)>> = vec![Vec::new(); c.ndims];
let mut cached = Cached { bound: vec![Bound { pnorm: 0.0, split: u32::MAX }; n] };
for i in 0..n {
let (di, wi) = c.row(i);
index_suffix(c, i, (di, wi), t, &mut index, &mut cached);
}
(0..n)
.into_par_iter()
.with_min_len(256)
.map_init(
|| Scratch { acc: vec![-1.0; n], touched: Vec::new(), xpn: Vec::new() },
|scratch, i| {
let (di, wi) = c.row(i);
accumulate(&index, (di, wi), i as u32, scratch);
let mut out = Vec::new();
verify_pruned(c, i, t, scratch, &cached, &mut out);
out
},
)
.flatten()
.collect()
}
#[must_use]
pub fn cosine_join_with(c: &Corpus, t: f64, mode: Concurrency) -> Vec<(usize, usize, f64)> {
#[cfg(all(feature = "gpu", target_os = "macos"))]
{
if matches!(mode, Concurrency::Cpu) {
return cosine_join(c, t);
}
let (indptr, dims, wts) = c.csr_f32();
let Some(gpu) = crate::simjoin_gpu::BatchCosineGpu::new(&indptr, &dims, &wts) else {
return cosine_join(c, t); };
match mode {
Concurrency::GpuPlusCpu => cosine_join_gpu(c, t, &gpu),
Concurrency::Gpu => cosine_join_gpu_f32(c, t, &gpu)
.into_iter()
.map(|(a, b, s)| (a, b, f64::from(s)))
.collect(),
Concurrency::Cpu => unreachable!("handled above"),
}
}
#[cfg(not(all(feature = "gpu", target_os = "macos")))]
{
let _ = mode; cosine_join(c, t)
}
}
pub struct CosineJoiner {
corpus: Corpus,
#[cfg(all(feature = "gpu", target_os = "macos"))]
gpu: Option<crate::simjoin_gpu::BatchCosineGpu>,
}
impl CosineJoiner {
#[must_use]
pub fn new(corpus: Corpus) -> Self {
#[cfg(all(feature = "gpu", target_os = "macos"))]
{
let (indptr, dims, wts) = corpus.csr_f32();
let gpu = crate::simjoin_gpu::BatchCosineGpu::new(&indptr, &dims, &wts);
Self { corpus, gpu }
}
#[cfg(not(all(feature = "gpu", target_os = "macos")))]
{
Self { corpus }
}
}
#[must_use]
pub fn corpus(&self) -> &Corpus {
&self.corpus
}
#[must_use]
pub fn has_gpu(&self) -> bool {
#[cfg(all(feature = "gpu", target_os = "macos"))]
{
self.gpu.is_some()
}
#[cfg(not(all(feature = "gpu", target_os = "macos")))]
{
false
}
}
#[must_use]
pub fn join(&self, t: f64, mode: Concurrency) -> Vec<(usize, usize, f64)> {
#[cfg(all(feature = "gpu", target_os = "macos"))]
{
match (mode, self.gpu.as_ref()) {
(Concurrency::GpuPlusCpu, Some(g)) => cosine_join_gpu(&self.corpus, t, g),
(Concurrency::Gpu, Some(g)) => cosine_join_gpu_f32(&self.corpus, t, g)
.into_iter()
.map(|(a, b, s)| (a, b, f64::from(s)))
.collect(),
_ => cosine_join(&self.corpus, t), }
}
#[cfg(not(all(feature = "gpu", target_os = "macos")))]
{
let _ = mode;
cosine_join(&self.corpus, t)
}
}
}
const PRUNE_SLACK: f64 = 1e-9;
const PREBOUND_MIN_DIMS: usize = 24;
#[cfg_attr(feature = "profiling", inline(never))]
fn accumulate(index: &[Vec<(u32, f64)>], (di, wi): (&[u32], &[f64]), cutoff: u32, s: &mut Scratch) {
let Scratch { acc, touched, .. } = s;
let acc = acc.as_mut_slice();
touched.clear();
touched.reserve(acc.len());
let tptr = touched.as_mut_ptr();
let mut tlen = 0usize;
for (&d, &w) in di.iter().zip(wi) {
for &(y, wy) in &index[d as usize] {
if y >= cutoff {
break;
}
let yu = y as usize;
let a = unsafe { acc.get_unchecked_mut(yu) };
let first = *a < 0.0;
let base = if first { 0.0 } else { *a };
*a = base + w * wy;
unsafe { *tptr.add(tlen) = y };
tlen += usize::from(first);
}
}
unsafe { touched.set_len(tlen) };
}
#[cfg_attr(feature = "profiling", inline(never))]
fn verify_pruned(
c: &Corpus,
i: usize,
t: f64,
s: &mut Scratch,
cached: &Cached,
out: &mut Vec<(usize, usize, f64)>,
) {
let (di, wi) = c.row(i);
s.xpn.clear();
s.xpn.push(0.0);
let mut sq = 0.0f64;
for &w in wi {
sq += w * w;
s.xpn.push(sq.sqrt());
}
let xnorm = sq.sqrt();
let prebound = di.len() >= PREBOUND_MIN_DIMS;
let need = t - PRUNE_SLACK;
let Scratch { acc, touched, xpn } = s;
for &y in touched.iter() {
let yu = y as usize;
let a = unsafe { std::mem::replace(acc.get_unchecked_mut(yu), -1.0) };
let bd = unsafe { *cached.bound.get_unchecked(yu) };
if prebound && a + xnorm * bd.pnorm < need {
continue;
}
let kstar = di.partition_point(|&d| d < bd.split);
let bound = a + unsafe { xpn.get_unchecked(kstar) } * bd.pnorm;
if bound >= need {
let cos = cos_full((di, wi), c.row(yu));
if cos >= t {
out.push((yu, i, cos));
}
}
}
}
#[cfg_attr(feature = "profiling", inline(never))]
fn index_suffix(
c: &Corpus,
i: usize,
(di, wi): (&[u32], &[f64]),
t: f64,
index: &mut [Vec<(u32, f64)>],
cached: &mut Cached,
) {
let mut rs = 0.0f64;
let mut b = 0usize;
for k in 0..di.len() {
let bound = wi[k] * c.maxw[di[k] as usize];
if rs + bound >= t {
break;
}
rs += bound;
b = k + 1;
}
let mut p = 0.0f64;
for &w in &wi[..b] {
p += w * w;
}
cached.bound[i] = Bound {
pnorm: p.sqrt(),
split: if b < di.len() { di[b] } else { u32::MAX },
};
for k in b..di.len() {
index[di[k] as usize].push((i as u32, wi[k]));
}
}
#[cfg(all(target_os = "macos", feature = "gpu"))]
const GPU_FILTER_MARGIN: f64 = 1e-4;
#[cfg(all(target_os = "macos", feature = "gpu"))]
fn collect_survivors(c: &Corpus, i: usize, t: f64, s: &mut Scratch, cached: &Cached, out: &mut Vec<(u32, u32)>) {
let (di, wi) = c.row(i);
s.xpn.clear();
s.xpn.push(0.0);
let mut sq = 0.0f64;
for &w in wi {
sq += w * w;
s.xpn.push(sq.sqrt());
}
let xnorm = sq.sqrt();
let prebound = di.len() >= PREBOUND_MIN_DIMS;
let need = t - PRUNE_SLACK;
let Scratch { acc, touched, xpn } = s;
for &y in touched.iter() {
let yu = y as usize;
let a = unsafe { std::mem::replace(acc.get_unchecked_mut(yu), -1.0) };
let bd = unsafe { *cached.bound.get_unchecked(yu) };
if prebound && a + xnorm * bd.pnorm < need {
continue;
}
let kstar = di.partition_point(|&d| d < bd.split);
let bound = a + unsafe { xpn.get_unchecked(kstar) } * bd.pnorm;
if bound >= need {
out.push((y, i as u32)); }
}
}
#[cfg(all(target_os = "macos", feature = "gpu"))]
#[must_use]
pub fn cosine_join_gpu(
c: &Corpus,
t: f64,
gpu: &crate::simjoin_gpu::BatchCosineGpu,
) -> Vec<(usize, usize, f64)> {
let (pa, pb) = survivor_pairs(c, t);
if pa.is_empty() {
return Vec::new();
}
let gcos = gpu.cosine_batch(&pa, &pb);
let need = t - GPU_FILTER_MARGIN;
(0..pa.len())
.into_par_iter()
.with_min_len(1024)
.filter_map(|k| {
if f64::from(gcos[k]) < need {
return None;
}
let (a, b) = (pa[k] as usize, pb[k] as usize);
let cos = cos_full(c.row(a), c.row(b));
(cos >= t).then_some((a, b, cos))
})
.collect()
}
#[cfg(all(target_os = "macos", feature = "gpu"))]
#[must_use]
pub fn cosine_join_gpu_f32(
c: &Corpus,
t: f64,
gpu: &crate::simjoin_gpu::BatchCosineGpu,
) -> Vec<(usize, usize, f32)> {
let (pa, pb) = survivor_pairs(c, t);
if pa.is_empty() {
return Vec::new();
}
let gcos = gpu.cosine_batch(&pa, &pb);
let tf = t as f32;
(0..pa.len())
.into_par_iter()
.with_min_len(1024)
.filter_map(|k| (gcos[k] >= tf).then_some((pa[k] as usize, pb[k] as usize, gcos[k])))
.collect()
}
#[cfg(all(target_os = "macos", feature = "gpu"))]
fn survivor_pairs(c: &Corpus, t: f64) -> (Vec<u32>, Vec<u32>) {
let n = c.n;
let mut index: Vec<Vec<(u32, f64)>> = vec![Vec::new(); c.ndims];
let mut cached = Cached { bound: vec![Bound { pnorm: 0.0, split: u32::MAX }; n] };
for i in 0..n {
let (di, wi) = c.row(i);
index_suffix(c, i, (di, wi), t, &mut index, &mut cached);
}
let pairs: Vec<(u32, u32)> = (0..n)
.into_par_iter()
.with_min_len(256)
.map_init(
|| Scratch { acc: vec![-1.0; n], touched: Vec::new(), xpn: Vec::new() },
|scratch, i| {
let (di, wi) = c.row(i);
accumulate(&index, (di, wi), i as u32, scratch);
let mut out = Vec::new();
collect_survivors(c, i, t, scratch, &cached, &mut out);
out
},
)
.flatten()
.collect();
pairs.into_iter().unzip()
}
#[cfg(feature = "profiling")]
#[must_use]
pub fn cosine_join_counts(c: &Corpus, t: f64) -> (u64, u64, u64) {
let n = c.n;
let mut index: Vec<Vec<(u32, f64)>> = vec![Vec::new(); c.ndims];
let mut cached = Cached { bound: vec![Bound { pnorm: 0.0, split: u32::MAX }; n] };
for i in 0..n {
let (di, wi) = c.row(i);
index_suffix(c, i, (di, wi), t, &mut index, &mut cached);
}
let mut s = Scratch { acc: vec![-1.0; n], touched: Vec::new(), xpn: Vec::new() };
let (mut ncand, mut survivors, mut pairs) = (0u64, 0u64, 0u64);
let need = t - PRUNE_SLACK;
for i in 0..n {
let (di, wi) = c.row(i);
accumulate(&index, (di, wi), i as u32, &mut s);
ncand += s.touched.len() as u64;
s.xpn.clear();
s.xpn.push(0.0);
let mut sq = 0.0f64;
for &w in wi {
sq += w * w;
s.xpn.push(sq.sqrt());
}
let xnorm = sq.sqrt();
let prebound = di.len() >= PREBOUND_MIN_DIMS;
for &y in &s.touched {
let yu = y as usize;
let a = std::mem::replace(&mut s.acc[yu], -1.0);
let bd = cached.bound[yu];
if prebound && a + xnorm * bd.pnorm < need {
continue;
}
let kstar = di.partition_point(|&d| d < bd.split);
if a + s.xpn[kstar] * bd.pnorm >= need {
survivors += 1;
if cos_full((di, wi), c.row(yu)) >= t {
pairs += 1;
}
}
}
}
(ncand, survivors, pairs)
}
#[must_use]
pub fn cosine_join_bruteforce(c: &Corpus, t: f64) -> Vec<(usize, usize, f64)> {
let mut out: Vec<(usize, usize, f64)> = Vec::new();
for i in 0..c.n {
for j in 0..i {
let s = cos_full(c.row(i), c.row(j));
if s >= t {
out.push((j, i, s));
}
}
}
out
}
#[cfg(test)]
mod tests {
use super::{cosine_join, cosine_join_bruteforce, Corpus};
fn xorshift(seed: u64) -> impl FnMut() -> u64 {
let mut s = seed;
move || {
s ^= s << 13;
s ^= s >> 7;
s ^= s << 17;
s
}
}
fn sort_pairs(mut v: Vec<(usize, usize, f64)>) -> Vec<(usize, usize, u64)> {
v.sort_by_key(|a| (a.0, a.1));
v.into_iter().map(|(a, b, s)| (a, b, s.to_bits())).collect()
}
#[test]
fn indexed_join_matches_bruteforce() {
let mut next = xorshift(0x9e37_79b9_7f4a_7c15);
for _ in 0..400 {
let n = (next() % 40 + 2) as usize;
let dim_space = next() % 15 + 1;
let rows: Vec<Vec<(u32, f64)>> = (0..n)
.map(|_| {
let nnz = (next() % 8) as usize;
(0..nnz)
.map(|_| ((next() % dim_space) as u32, (next() % 10 + 1) as f64))
.collect()
})
.collect();
let c = Corpus::from_rows(&rows);
for &t in &[0.1_f64, 0.25, 0.5, 0.75, 0.9, 1.0] {
let got = sort_pairs(cosine_join(&c, t));
let want = sort_pairs(cosine_join_bruteforce(&c, t));
assert_eq!(got, want, "n={n} t={t}");
}
}
}
#[cfg(all(feature = "gpu", target_os = "macos"))]
#[test]
fn gpu_hybrid_matches_cpu() {
use super::CosineJoiner;
use crate::Concurrency;
let mut next = xorshift(0x1357_9bdf_0246_8ace);
for _ in 0..60 {
let n = (next() % 60 + 4) as usize;
let dim_space = next() % 20 + 2;
let rows: Vec<Vec<(u32, f64)>> = (0..n)
.map(|_| {
let nnz = (next() % 10) as usize;
(0..nnz)
.map(|_| ((next() % dim_space) as u32, (next() % 10 + 1) as f64))
.collect()
})
.collect();
let joiner = CosineJoiner::new(Corpus::from_rows(&rows));
if !joiner.has_gpu() {
eprintln!("no Metal device — skipping gpu_hybrid_matches_cpu");
return;
}
for &t in &[0.1_f64, 0.3, 0.5, 0.7, 0.9, 1.0] {
let want = sort_pairs(cosine_join(joiner.corpus(), t));
assert_eq!(
sort_pairs(joiner.join(t, Concurrency::GpuPlusCpu)),
want,
"GpuPlusCpu n={n} t={t}"
);
assert_eq!(sort_pairs(joiner.join(t, Concurrency::Cpu)), want, "Cpu n={n} t={t}");
}
}
}
}