sochdb-vector 2.0.6

Streaming elimination vector search engine for SochDB - CPU-first ANN with RDF + BPS
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
//! RDF (Rare-Dominant Fingerprint) builder and posting list handling.
//!
//! RDF uses IR-style inverted lists over a sparse fingerprint of each vector.
//! Posting lists are stored in VID-striped chunks for cache-friendly scoring.

use crate::config::RdfConfig;
use crate::segment::format::PostingListEntry;
use crate::types::*;
use std::collections::HashMap;

/// Builder for RDF posting lists
pub struct RdfBuilder<'a> {
    config: &'a RdfConfig,
    dim: u32,
    vectors: &'a [Vec<f32>],
    dim_weights: Vec<f32>,
    doc_freqs: Vec<u32>,
}

impl<'a> RdfBuilder<'a> {
    /// Create a new RDF builder
    pub fn new(config: &'a RdfConfig, dim: u32, rotated_vectors: &'a [Vec<f32>]) -> Self {
        let n_vec = rotated_vectors.len();
        let dim_usize = dim as usize;

        // Compute dimension statistics
        let mut sum = vec![0.0f64; dim_usize];
        let mut sum_sq = vec![0.0f64; dim_usize];
        let mut doc_freqs = vec![0u32; dim_usize];

        // Track which dims appear in each vector's top-t
        let top_t = config.top_t as usize;

        for vec in rotated_vectors {
            // Find top-t dims by absolute value
            let mut scored: Vec<(usize, f32)> =
                vec.iter().enumerate().map(|(i, &v)| (i, v.abs())).collect();
            let nth_idx = top_t.min(scored.len()).saturating_sub(1);
            if nth_idx < scored.len() {
                scored.select_nth_unstable_by(nth_idx, |a, b| b.1.partial_cmp(&a.1).unwrap());
            }

            for &(dim_idx, _) in scored.iter().take(top_t) {
                doc_freqs[dim_idx] += 1;
            }

            for (i, &v) in vec.iter().enumerate() {
                sum[i] += v as f64;
                sum_sq[i] += (v * v) as f64;
            }
        }

        // Compute dimension weights: w[d] = α·idf[d] + β·sqrt(var[d])
        let n = n_vec as f64;
        let mut dim_weights = Vec::with_capacity(dim_usize);

        for d in 0..dim_usize {
            let mean = sum[d] / n;
            let var = (sum_sq[d] / n - mean * mean).max(0.0);
            let std_dev = var.sqrt();

            // IDF-like weight: log(N / df)
            let df = doc_freqs[d].max(1) as f64;
            let idf = (n / df).ln();

            // Combined weight
            let weight = config.idf_weight as f64 * idf + config.var_weight as f64 * std_dev;
            dim_weights.push(weight as f32);
        }

        Self {
            config,
            dim,
            vectors: rotated_vectors,
            dim_weights,
            doc_freqs,
        }
    }

    /// Get dimension weights
    pub fn dim_weights(&self) -> Vec<f32> {
        self.dim_weights.clone()
    }

    /// Build posting lists with striped storage
    /// Returns (directory, concatenated posting data)
    pub fn build(&self) -> (Vec<PostingListEntry>, Vec<u8>) {
        let dim_usize = self.dim as usize;
        let top_t = self.config.top_t as usize;
        let stripe_shift = self.config.stripe_shift;
        let _stripe_size = 1usize << stripe_shift;

        // Collect postings per dimension
        // Each posting: (vid, sign, magnitude)
        let mut dim_postings: Vec<Vec<(VectorId, bool, u8)>> = vec![Vec::new(); dim_usize];

        // Compute per-dimension magnitude scales for quantization
        let mut dim_max_mag = vec![0.0f32; dim_usize];

        for (_vid, vec) in self.vectors.iter().enumerate() {
            // Score each dim: |v[d]| * w[d]
            let mut scored: Vec<(usize, f32, f32)> = vec
                .iter()
                .enumerate()
                .map(|(d, &v)| (d, v.abs() * self.dim_weights[d], v))
                .collect();

            // Select top-t by score
            if scored.len() > top_t {
                scored.select_nth_unstable_by(top_t - 1, |a, b| b.1.partial_cmp(&a.1).unwrap());
                scored.truncate(top_t);
            }

            for &(dim_idx, _, value) in &scored {
                let mag = value.abs();
                dim_max_mag[dim_idx] = dim_max_mag[dim_idx].max(mag);
            }
        }

        // Second pass: collect postings with quantized magnitudes
        for (vid, vec) in self.vectors.iter().enumerate() {
            let mut scored: Vec<(usize, f32, f32)> = vec
                .iter()
                .enumerate()
                .map(|(d, &v)| (d, v.abs() * self.dim_weights[d], v))
                .collect();

            if scored.len() > top_t {
                scored.select_nth_unstable_by(top_t - 1, |a, b| b.1.partial_cmp(&a.1).unwrap());
                scored.truncate(top_t);
            }

            for &(dim_idx, _, value) in &scored {
                let sign = value >= 0.0;
                let mag = value.abs();
                let max_mag = dim_max_mag[dim_idx].max(1e-10);
                let mag8 = ((mag / max_mag) * 127.0).min(127.0) as u8;

                dim_postings[dim_idx].push((vid as VectorId, sign, mag8));
            }
        }

        // Build striped posting lists
        let mut directory = Vec::with_capacity(dim_usize);
        let mut data = Vec::new();

        for dim_idx in 0..dim_usize {
            let postings = &dim_postings[dim_idx];

            if postings.is_empty() {
                directory.push(PostingListEntry {
                    offset: data.len() as u64,
                    length: 0,
                    num_stripes: 0,
                    flags: 0,
                });
                continue;
            }

            let offset = data.len() as u64;

            // Check if this is a stop-dim
            let is_stopword = self.doc_freqs[dim_idx] > self.config.stop_dim_threshold;
            let flags = if is_stopword {
                PostingListEntry::FLAG_STOPWORD
            } else {
                0
            };

            // Group postings by stripe
            let mut stripes: HashMap<StripeId, Vec<(u8, bool, u8)>> = HashMap::new();
            for &(vid, sign, mag) in postings {
                let stripe_id = vid >> stripe_shift;
                let vid_in_stripe = (vid & ((1 << stripe_shift) - 1)) as u8;
                stripes
                    .entry(stripe_id)
                    .or_default()
                    .push((vid_in_stripe, sign, mag));
            }

            // Sort stripes by stripe_id
            let mut stripe_ids: Vec<StripeId> = stripes.keys().copied().collect();
            stripe_ids.sort();

            // Write stripe chunks
            for stripe_id in &stripe_ids {
                let entries = stripes.get(stripe_id).unwrap();

                // Write stripe header
                let header = StripeChunkHeader {
                    stripe_id: *stripe_id,
                    count: entries.len() as u16,
                    _pad: 0,
                };
                data.extend_from_slice(bytemuck::bytes_of(&header));

                // Write entries sorted by vid_in_stripe
                let mut sorted_entries = entries.clone();
                sorted_entries.sort_by_key(|e| e.0);

                for (vid_in_stripe, sign, mag) in sorted_entries {
                    let posting = RdfPosting::new(vid_in_stripe, sign, mag);
                    data.extend_from_slice(bytemuck::bytes_of(&posting));
                }
            }

            directory.push(PostingListEntry {
                offset,
                length: postings.len() as u32,
                num_stripes: stripe_ids.len() as u16,
                flags,
            });
        }

        (directory, data)
    }
}

/// RDF scorer for query-time candidate generation
pub struct RdfScorer<'a> {
    directory: &'a [PostingListEntry],
    rdf_data: &'a [u8],
    dim_weights: &'a [f32],
    stripe_shift: u8,
    stripe_size: usize,
    n_vec: u32,
}

impl<'a> RdfScorer<'a> {
    /// Create a new RDF scorer
    pub fn new(
        directory: &'a [PostingListEntry],
        rdf_data: &'a [u8],
        dim_weights: &'a [f32],
        stripe_shift: u8,
        n_vec: u32,
    ) -> Self {
        Self {
            directory,
            rdf_data,
            dim_weights,
            stripe_shift,
            stripe_size: 1 << stripe_shift,
            n_vec,
        }
    }

    /// Score candidates using RDF
    /// Returns top L_A candidates by score (higher = better)
    pub fn score(&self, query: &[f32], top_t: usize, l_a: usize) -> Vec<ScoredCandidate> {
        if self.directory.is_empty() {
            return Vec::new();
        }

        let _dim = query.len();

        // Find top-t query dimensions by |q[d]| * w[d]
        let mut scored: Vec<(usize, f32, f32)> = query
            .iter()
            .enumerate()
            .map(|(d, &v)| {
                let w = if d < self.dim_weights.len() {
                    self.dim_weights[d]
                } else {
                    1.0
                };
                (d, v.abs() * w, v)
            })
            .collect();

        if scored.len() > top_t {
            scored.select_nth_unstable_by(top_t - 1, |a, b| b.1.partial_cmp(&a.1).unwrap());
            scored.truncate(top_t);
        }

        // Get query dims (excluding stopwords — but keep dims that pass
        // the directory bounds check even if all are stopwords)
        let query_dims: Vec<(usize, f32, f32)> = scored
            .into_iter()
            .filter(|&(d, _, _)| d < self.directory.len() && !self.directory[d].is_stopword())
            .collect();

        if query_dims.is_empty() {
            // All query dimensions were stopwords — fall back to using
            // the original dimensions WITHOUT the stopword filter.
            // Returning empty here would cause zero recall for common
            // queries where all top-t dimensions happen to be stop-dims.
            // IDF-based dim_weights will naturally downweight these.
            let query_dims_fallback: Vec<(usize, f32, f32)> = {
                let mut s: Vec<(usize, f32, f32)> = query
                    .iter()
                    .enumerate()
                    .map(|(d, &v)| {
                        let w = if d < self.dim_weights.len() {
                            self.dim_weights[d]
                        } else {
                            1.0
                        };
                        (d, v.abs() * w, v)
                    })
                    .collect();
                if s.len() > top_t {
                    s.select_nth_unstable_by(top_t - 1, |a, b| b.1.partial_cmp(&a.1).unwrap());
                    s.truncate(top_t);
                }
                s.into_iter()
                    .filter(|&(d, _, _)| d < self.directory.len())
                    .collect()
            };
            if query_dims_fallback.is_empty() {
                return Vec::new();
            }
            return self.score_with_dims(&query_dims_fallback, l_a);
        }

        self.score_with_dims(&query_dims, l_a)
    }

    /// Internal scoring with a given set of query dimensions.
    /// Separated from `score()` to allow fallback when all dims are stopwords.
    fn score_with_dims(
        &self,
        query_dims: &[(usize, f32, f32)],
        l_a: usize,
    ) -> Vec<ScoredCandidate> {
        // Use stripe-based accumulation
        let num_stripes = (self.n_vec as usize + self.stripe_size - 1) / self.stripe_size;
        let mut global_candidates = Vec::new();

        // Allocate stripe accumulator ONCE, clear per-stripe (avoids N allocs)
        let mut stripe_acc = vec![0.0f32; self.stripe_size];

        // Process stripe by stripe for cache locality
        for stripe_id in 0..num_stripes as u32 {
            // Clear accumulator (memset — vectorizes to single SIMD store)
            stripe_acc.iter_mut().for_each(|x| *x = 0.0);

            for &(dim_idx, _, q_value) in query_dims {
                let entry = &self.directory[dim_idx];
                if entry.length == 0 {
                    continue;
                }

                // Find and process the stripe chunk for this dimension
                self.accumulate_stripe(
                    entry,
                    stripe_id,
                    q_value,
                    self.dim_weights[dim_idx],
                    &mut stripe_acc,
                );
            }

            // Collect non-zero scores from this stripe
            let base_vid = stripe_id << self.stripe_shift;
            for (i, &score) in stripe_acc.iter().enumerate() {
                if score > 0.0 {
                    let vid = base_vid + i as u32;
                    if vid < self.n_vec {
                        global_candidates.push(ScoredCandidate { id: vid, score });
                    }
                }
            }
        }

        // Select top L_A
        if global_candidates.len() <= l_a {
            global_candidates.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
            return global_candidates;
        }

        global_candidates
            .select_nth_unstable_by(l_a - 1, |a, b| b.score.partial_cmp(&a.score).unwrap());
        global_candidates.truncate(l_a);
        global_candidates.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());

        global_candidates
    }

    /// Accumulate scores for a specific stripe from one dimension's posting list
    fn accumulate_stripe(
        &self,
        entry: &PostingListEntry,
        target_stripe_id: StripeId,
        q_value: f32,
        dim_weight: f32,
        stripe_acc: &mut [f32],
    ) {
        let mut offset = entry.offset as usize;
        let header_size = std::mem::size_of::<StripeChunkHeader>();
        let posting_size = std::mem::size_of::<RdfPosting>();

        for _ in 0..entry.num_stripes {
            if offset + header_size > self.rdf_data.len() {
                break;
            }

            let header: StripeChunkHeader =
                unsafe { std::ptr::read_unaligned(self.rdf_data.as_ptr().add(offset) as *const _) };
            offset += header_size;

            let count = header.count as usize;

            if header.stripe_id == target_stripe_id {
                // Process this stripe
                for _ in 0..count {
                    if offset + posting_size > self.rdf_data.len() {
                        break;
                    }

                    let posting: RdfPosting = unsafe {
                        std::ptr::read_unaligned(self.rdf_data.as_ptr().add(offset) as *const _)
                    };
                    offset += posting_size;

                    let vid_in_stripe = posting.vid_in_stripe as usize;
                    let sign = if posting.sign() { 1.0 } else { -1.0 };
                    let mag = posting.magnitude() as f32 / 127.0;

                    // Score contribution: q_value * sign * mag * weight
                    let contribution = q_value * sign * mag * dim_weight;
                    stripe_acc[vid_in_stripe] += contribution;
                }
                return;
            } else {
                // Skip this stripe
                offset += count * posting_size;
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_rdf_build() {
        let config = RdfConfig {
            top_t: 8,
            stripe_shift: 4, // 16 vids per stripe
            stop_dim_threshold: 1000,
            idf_weight: 0.5,
            var_weight: 0.5,
        };

        let vectors: Vec<Vec<f32>> = (0..100)
            .map(|i| {
                (0..32)
                    .map(|j| if j == (i % 32) { 1.0 } else { 0.1 })
                    .collect()
            })
            .collect();

        let builder = RdfBuilder::new(&config, 32, &vectors);
        let (directory, data) = builder.build();

        assert_eq!(directory.len(), 32);
        assert!(!data.is_empty());
    }

    #[test]
    fn test_rdf_scorer() {
        let config = RdfConfig {
            top_t: 4,
            stripe_shift: 4,
            stop_dim_threshold: 1000,
            idf_weight: 0.5,
            var_weight: 0.5,
        };

        // Create vectors with distinctive patterns
        let vectors: Vec<Vec<f32>> = (0..50)
            .map(|i| {
                (0..16)
                    .map(|j| if j == (i % 16) { 1.0 } else { 0.0 })
                    .collect()
            })
            .collect();

        let builder = RdfBuilder::new(&config, 16, &vectors);
        let dim_weights = builder.dim_weights();
        let (directory, data) = builder.build();

        let scorer = RdfScorer::new(&directory, &data, &dim_weights, 4, 50);

        // Query matching vector 0 pattern
        let query: Vec<f32> = (0..16).map(|j| if j == 0 { 1.0 } else { 0.0 }).collect();
        let candidates = scorer.score(&query, 4, 10);

        // Should find vector 0 (and others with same pattern) as top candidates
        assert!(!candidates.is_empty());
    }
}