#![cfg(all(test, feature = "persistence"))]
use crate::storage::PayloadStorage;
use crate::{distance::DistanceMetric, point::Point, quantization::StorageMode, Collection};
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
#[test]
fn test_upsert_product_quantization_after_training_backfills_cache() {
let temp_dir = tempfile::tempdir().expect("temp dir should be created");
let collection = Collection::create_with_options(
PathBuf::from(temp_dir.path()),
16,
DistanceMetric::Cosine,
StorageMode::ProductQuantization,
)
.expect("collection should be created");
let points: Vec<Point> = (0u64..128)
.map(|id| {
let mut vector: Vec<f32> = (0..16)
.map(|d| {
let id_term = f32::from(u16::try_from(id + 1).expect("id fits in u16")) * 0.17;
let d_term =
f32::from(u16::try_from(d).expect("dimension index fits in u16")) * 0.11;
(id_term + d_term).sin()
})
.collect();
let norm = vector.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for x in &mut vector {
*x /= norm;
}
}
Point::without_payload(id, vector)
})
.collect();
collection.upsert(points).expect("upsert should succeed");
assert!(
collection.pq_quantizer.read().is_some(),
"quantizer should be trained after reaching sample threshold"
);
assert_eq!(
collection.pq_cache.read().len(),
128,
"all training samples should be backfilled in PQ cache"
);
}
#[test]
fn test_concurrent_upsert_and_search_no_deadlock() {
let temp_dir = tempfile::tempdir().expect("temp dir should be created");
let col = Arc::new(
Collection::create(PathBuf::from(temp_dir.path()), 4, DistanceMetric::Cosine)
.expect("collection should be created"),
);
#[allow(clippy::cast_precision_loss)] let seeds: Vec<Point> = (0u64..20)
.map(|i| Point::without_payload(i, vec![i as f32 / 20.0, 0.1, 0.1, 0.1]))
.collect();
col.upsert(seeds).expect("seed upsert should succeed");
let handles: Vec<_> = (0u64..4)
.map(|t| {
let col = Arc::clone(&col);
thread::spawn(move || {
for i in 0u64..50 {
let id = t * 1_000 + i;
#[allow(clippy::cast_precision_loss)] col.upsert(vec![Point::without_payload(
id,
vec![i as f32 / 50.0, 0.2, 0.2, 0.2],
)])
.expect("concurrent upsert should not fail");
let _ = col.search(&[0.5_f32, 0.1, 0.1, 0.1], 5);
}
})
})
.collect();
for h in handles {
h.join()
.expect("thread panicked — possible deadlock or data race");
}
}
#[test]
fn test_upsert_indexes_sparse_vectors() {
use crate::index::sparse::SparseVector;
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let mut sv_map = BTreeMap::new();
sv_map.insert(String::new(), SparseVector::new(vec![(1, 1.0), (2, 0.5)]));
sv_map.insert(
"title".to_string(),
SparseVector::new(vec![(10, 2.0), (20, 1.0)]),
);
let point = Point::with_sparse(1, vec![0.1, 0.2, 0.3, 0.4], None, Some(sv_map));
coll.upsert(vec![point]).unwrap();
let indexes = coll.sparse_indexes().read();
assert!(
indexes.contains_key(""),
"Default sparse index should be created"
);
assert!(
indexes.contains_key("title"),
"Named sparse index 'title' should be created"
);
let default_idx = indexes.get("").unwrap();
assert_eq!(default_idx.doc_count(), 1);
let postings = default_idx.get_all_postings(1);
assert_eq!(postings.len(), 1);
assert_eq!(postings[0].doc_id, 1);
let title_idx = indexes.get("title").unwrap();
assert_eq!(title_idx.doc_count(), 1);
let postings = title_idx.get_all_postings(10);
assert_eq!(postings.len(), 1);
assert_eq!(postings[0].doc_id, 1);
}
#[test]
fn test_delete_removes_from_sparse_indexes() {
use crate::index::sparse::SparseVector;
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let mut sv_map = BTreeMap::new();
sv_map.insert(String::new(), SparseVector::new(vec![(1, 1.0)]));
let point = Point::with_sparse(42, vec![0.1, 0.2, 0.3, 0.4], None, Some(sv_map));
coll.upsert(vec![point]).unwrap();
{
let indexes = coll.sparse_indexes().read();
let idx = indexes.get("").unwrap();
assert_eq!(idx.doc_count(), 1);
}
coll.delete(&[42]).unwrap();
{
let indexes = coll.sparse_indexes().read();
let idx = indexes.get("").unwrap();
assert_eq!(idx.doc_count(), 0);
assert!(idx.get_all_postings(1).is_empty());
}
}
#[test]
#[allow(clippy::cast_possible_truncation)]
fn test_u32_max_term_id() {
use crate::index::sparse::search::sparse_search;
use crate::index::sparse::SparseVector;
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let extreme_term = u32::MAX - 1;
let mut sv_map = BTreeMap::new();
sv_map.insert(String::new(), SparseVector::new(vec![(extreme_term, 1.5)]));
let point = Point::with_sparse(1, vec![0.1, 0.2, 0.3, 0.4], None, Some(sv_map));
coll.upsert(vec![point]).unwrap();
{
let indexes = coll.sparse_indexes().read();
let idx = indexes.get("").unwrap();
assert_eq!(idx.doc_count(), 1);
let postings = idx.get_all_postings(extreme_term);
assert_eq!(
postings.len(),
1,
"term_id {extreme_term} must have one posting"
);
assert_eq!(postings[0].doc_id, 1);
assert!((postings[0].weight - 1.5).abs() < f32::EPSILON);
}
{
let indexes = coll.sparse_indexes().read();
let idx = indexes.get("").unwrap();
let query = SparseVector::new(vec![(extreme_term, 1.0)]);
let results = sparse_search(idx, &query, 10);
assert_eq!(
results.len(),
1,
"search with extreme term_id must find the document"
);
assert_eq!(results[0].doc_id, 1);
}
coll.flush().unwrap();
let coll2 = Collection::open(dir.path().to_path_buf()).unwrap();
{
let indexes = coll2.sparse_indexes().read();
let idx = indexes.get("").unwrap();
assert_eq!(
idx.doc_count(),
1,
"doc_count must survive persistence roundtrip"
);
let postings = idx.get_all_postings(extreme_term);
assert_eq!(
postings.len(),
1,
"extreme term_id must survive persistence roundtrip"
);
assert_eq!(postings[0].doc_id, 1);
}
}
#[test]
fn test_sparse_wal_written_on_upsert() {
use crate::index::sparse::SparseVector;
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let mut sv_map = BTreeMap::new();
sv_map.insert(String::new(), SparseVector::new(vec![(1, 1.0)]));
let point = Point::with_sparse(1, vec![0.1, 0.2, 0.3, 0.4], None, Some(sv_map));
coll.upsert(vec![point]).unwrap();
let wal_path = dir.path().join("sparse.wal");
assert!(wal_path.exists(), "Sparse WAL should be created on upsert");
assert!(
std::fs::metadata(&wal_path).unwrap().len() > 0,
"Sparse WAL should have content"
);
}
#[test]
fn test_upsert_batch_produces_searchable_results() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 16, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)] let points: Vec<Point> = (0u64..200)
.map(|i| {
let v: Vec<f32> = (0..16).map(|d| (i as f32 + d as f32) * 0.01).collect();
Point::without_payload(i, v)
})
.collect();
coll.upsert(points).expect("batch upsert should succeed");
#[allow(clippy::cast_precision_loss)] let query: Vec<f32> = (0..16).map(|d| d as f32 * 0.01).collect();
let results = coll.search(&query, 10).expect("search should succeed");
assert_eq!(results.len(), 10, "search should return k results");
assert_eq!(coll.config.read().point_count, 200);
}
#[test]
fn test_upsert_throughput_not_degraded_vs_bulk() {
let dim = 32;
let n = 500;
let dir1 = tempfile::tempdir().unwrap();
let coll1 = Collection::create(dir1.path().to_path_buf(), dim, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)]
let points1: Vec<Point> = (0u64..n)
.map(|i| {
let v: Vec<f32> = (0..dim).map(|d| (i as f32 + d as f32) * 0.01).collect();
Point::without_payload(i, v)
})
.collect();
let t0 = std::time::Instant::now();
coll1.upsert(points1).expect("upsert should succeed");
let upsert_dur = t0.elapsed();
let dir2 = tempfile::tempdir().unwrap();
let coll2 = Collection::create(dir2.path().to_path_buf(), dim, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)]
let points2: Vec<Point> = (0u64..n)
.map(|i| {
let v: Vec<f32> = (0..dim).map(|d| (i as f32 + d as f32) * 0.01).collect();
Point::without_payload(i, v)
})
.collect();
let t0 = std::time::Instant::now();
coll2
.upsert_bulk(&points2)
.expect("upsert_bulk should succeed");
let bulk_dur = t0.elapsed();
let ratio = upsert_dur.as_secs_f64() / bulk_dur.as_secs_f64().max(0.001);
assert!(
ratio < 8.0,
"upsert() is {ratio:.1}x slower than upsert_bulk() — \
expected <8x (upsert={upsert_dur:?}, bulk={bulk_dur:?})"
);
}
#[test]
fn test_upsert_intra_batch_duplicate_ids_last_writer_wins() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
coll.upsert(vec![Point::new(
10,
vec![0.1, 0.2, 0.3, 0.4],
Some(serde_json::json!({"pre": "existing"})),
)])
.unwrap();
let batch = vec![
Point::new(
1,
vec![1.0, 0.0, 0.0, 0.0],
Some(serde_json::json!({"v": "A"})),
),
Point::new(
1,
vec![0.0, 1.0, 0.0, 0.0],
Some(serde_json::json!({"v": "B"})),
),
Point::new(
10,
vec![0.0, 0.0, 1.0, 0.0],
Some(serde_json::json!({"v": "X"})),
),
Point::new(10, vec![0.0, 0.0, 0.0, 1.0], None),
Point::without_payload(20, vec![0.5, 0.5, 0.0, 0.0]),
Point::new(
20,
vec![0.0, 0.5, 0.5, 0.0],
Some(serde_json::json!({"v": "C"})),
),
Point::new(
30,
vec![0.0, 0.0, 0.5, 0.5],
Some(serde_json::json!({"v": "D"})),
),
];
coll.upsert(batch).unwrap();
let results = coll.get(&[1, 10, 20, 30]);
assert_eq!(results.len(), 4);
let p1 = results[0].as_ref().expect("id=1 should exist");
assert_eq!(p1.payload, Some(serde_json::json!({"v": "B"})));
assert_eq!(p1.vector, vec![0.0, 1.0, 0.0, 0.0]);
let p10 = results[1]
.as_ref()
.expect("id=10 should still have a vector");
assert!(p10.payload.is_none(), "payload should be None (deleted)");
assert_eq!(p10.vector, vec![0.0, 0.0, 0.0, 1.0]);
let p20 = results[2].as_ref().expect("id=20 should exist");
assert_eq!(p20.payload, Some(serde_json::json!({"v": "C"})));
assert_eq!(p20.vector, vec![0.0, 0.5, 0.5, 0.0]);
let p30 = results[3].as_ref().expect("id=30 should exist");
assert_eq!(p30.payload, Some(serde_json::json!({"v": "D"})));
assert_eq!(coll.len(), 4, "should have 4 unique points");
}
#[test]
fn test_upsert_intra_batch_wal_replay_consistency() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().to_path_buf();
{
let coll = Collection::create(path.clone(), 4, DistanceMetric::Cosine).unwrap();
let batch = vec![
Point::new(
1,
vec![1.0, 0.0, 0.0, 0.0],
Some(serde_json::json!({"a": 1})),
),
Point::new(
1,
vec![0.0, 1.0, 0.0, 0.0],
Some(serde_json::json!({"b": 2})),
),
Point::without_payload(2, vec![0.5, 0.5, 0.0, 0.0]),
Point::new(
2,
vec![0.0, 0.5, 0.5, 0.0],
Some(serde_json::json!({"c": 3})),
),
];
coll.upsert(batch).unwrap();
coll.flush().unwrap();
}
let coll2 = Collection::open(path).unwrap();
let results = coll2.get(&[1, 2]);
let p1 = results[0].as_ref().expect("id=1 should exist after reload");
assert_eq!(p1.payload, Some(serde_json::json!({"b": 2})));
assert_eq!(p1.vector, vec![0.0, 1.0, 0.0, 0.0]);
let p2 = results[1].as_ref().expect("id=2 should exist after reload");
assert_eq!(p2.payload, Some(serde_json::json!({"c": 3})));
assert_eq!(p2.vector, vec![0.0, 0.5, 0.5, 0.0]);
}
#[test]
fn test_upsert_intra_batch_wal_dedup_reduces_entries() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let batch = vec![
Point::new(
1,
vec![1.0, 0.0, 0.0, 0.0],
Some(serde_json::json!({"v": "A"})),
),
Point::new(
1,
vec![0.0, 1.0, 0.0, 0.0],
Some(serde_json::json!({"v": "B"})),
),
Point::new(
1,
vec![0.0, 0.0, 1.0, 0.0],
Some(serde_json::json!({"v": "C"})),
),
];
coll.upsert(batch).unwrap();
coll.flush().unwrap();
let payload_ids = coll.payload_storage.read().ids();
assert_eq!(payload_ids.len(), 1, "should have 1 unique payload ID");
assert!(
payload_ids.contains(&1),
"id=1 should be in payload storage"
);
let payload = coll.payload_storage.read().retrieve(1).unwrap();
assert_eq!(payload, Some(serde_json::json!({"v": "C"})));
}
#[test]
fn test_batch_store_all_parallel_io_correctness() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 128, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)] let points: Vec<Point> = (0u64..500)
.map(|i| {
let v: Vec<f32> = (0..128).map(|d| (i as f32 + d as f32) * 0.001).collect();
let payload = serde_json::json!({"idx": i, "label": format!("point_{i}")});
Point::new(i, v, Some(payload))
})
.collect();
coll.upsert(points.clone()).expect("upsert should succeed");
assert_eq!(coll.len(), 500, "all 500 points should be stored");
let ids: Vec<u64> = (0..500).collect();
let results = coll.get(&ids);
for (i, result) in results.iter().enumerate() {
let p = result
.as_ref()
.unwrap_or_else(|| panic!("point {i} should exist"));
assert_eq!(p.vector.len(), 128, "point {i} should have 128 dimensions");
#[allow(clippy::cast_precision_loss)]
let expected_first = i as f32 * 0.001;
assert!(
(p.vector[0] - expected_first).abs() < 1e-6,
"point {i} first element mismatch"
);
let payload = p
.payload
.as_ref()
.unwrap_or_else(|| panic!("point {i} should have payload"));
assert_eq!(payload["idx"], i as u64, "point {i} payload.idx mismatch");
}
#[allow(clippy::cast_precision_loss)] let query: Vec<f32> = (0..128).map(|d| d as f32 * 0.001).collect();
let search_results = coll.search(&query, 10).expect("search should succeed");
assert_eq!(search_results.len(), 10, "search should return k results");
}
#[test]
fn test_batch_store_all_parallel_io_survives_reopen() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().to_path_buf();
{
let coll = Collection::create(path.clone(), 32, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)]
let points: Vec<Point> = (0u64..100)
.map(|i| {
let v: Vec<f32> = (0..32).map(|d| (i as f32 + d as f32) * 0.01).collect();
Point::new(i, v, Some(serde_json::json!({"id": i})))
})
.collect();
coll.upsert(points).expect("upsert should succeed");
coll.flush().expect("flush should succeed");
}
let coll2 = Collection::open(path).unwrap();
assert_eq!(coll2.len(), 100, "all points should survive reopen");
let results = coll2.get(&[0, 50, 99]);
for (i, &id) in [0u64, 50, 99].iter().enumerate() {
let p = results[i]
.as_ref()
.unwrap_or_else(|| panic!("point {id} should exist after reopen"));
assert_eq!(p.vector.len(), 32);
let payload = p
.payload
.as_ref()
.unwrap_or_else(|| panic!("point {id} should have payload after reopen"));
assert_eq!(payload["id"], id);
}
}
#[test]
fn test_batch_store_all_parallel_io_no_payloads() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 16, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)]
let points: Vec<Point> = (0u64..200)
.map(|i| {
let v: Vec<f32> = (0..16).map(|d| (i as f32 + d as f32) * 0.01).collect();
Point::without_payload(i, v)
})
.collect();
coll.upsert(points).expect("upsert should succeed");
assert_eq!(coll.len(), 200, "all points should be stored");
let results = coll.get(&[0]);
let p0 = results[0].as_ref().expect("point 0 should exist");
assert_eq!(p0.vector.len(), 16);
assert!(p0.payload.is_none(), "no payload should be stored");
}
#[test]
fn test_batch_store_all_parallel_io_with_duplicates() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
coll.upsert(vec![Point::new(
1,
vec![0.1, 0.2, 0.3, 0.4],
Some(serde_json::json!({"pre": "existing"})),
)])
.unwrap();
let batch = vec![
Point::new(
1,
vec![1.0, 0.0, 0.0, 0.0],
Some(serde_json::json!({"v": "A"})),
),
Point::new(
1,
vec![0.0, 1.0, 0.0, 0.0],
Some(serde_json::json!({"v": "B"})),
),
Point::new(
2,
vec![0.5, 0.5, 0.0, 0.0],
Some(serde_json::json!({"v": "C"})),
),
];
coll.upsert(batch)
.expect("batch with duplicates should succeed via parallel I/O");
let results = coll.get(&[1, 2]);
let p1 = results[0].as_ref().expect("id=1 should exist");
assert_eq!(
p1.payload,
Some(serde_json::json!({"v": "B"})),
"last writer wins for payload"
);
assert_eq!(
p1.vector,
vec![0.0, 1.0, 0.0, 0.0],
"last writer wins for vector"
);
let p2 = results[1].as_ref().expect("id=2 should exist");
assert_eq!(p2.payload, Some(serde_json::json!({"v": "C"})));
}
#[test]
fn test_upsert_bulk_from_raw_basic() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let vectors: Vec<f32> = vec![
1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ];
let ids: Vec<u64> = vec![10, 20, 30];
let payloads = vec![
Some(serde_json::json!({"tag": "a"})),
None,
Some(serde_json::json!({"tag": "c"})),
];
let inserted = coll
.upsert_bulk_from_raw(&vectors, &ids, 4, Some(&payloads))
.expect("upsert_bulk_from_raw should succeed");
assert_eq!(inserted, 3);
assert_eq!(coll.len(), 3);
let results = coll.get(&[10, 20, 30]);
let p10 = results[0].as_ref().expect("id=10 should exist");
assert_eq!(p10.vector, vec![1.0, 0.0, 0.0, 0.0]);
assert_eq!(p10.payload, Some(serde_json::json!({"tag": "a"})));
let p20 = results[1].as_ref().expect("id=20 should exist");
assert_eq!(p20.vector, vec![0.0, 1.0, 0.0, 0.0]);
assert!(p20.payload.is_none());
let p30 = results[2].as_ref().expect("id=30 should exist");
assert_eq!(p30.vector, vec![0.0, 0.0, 1.0, 0.0]);
assert_eq!(p30.payload, Some(serde_json::json!({"tag": "c"})));
}
#[test]
fn test_upsert_bulk_from_raw_no_payloads() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let vectors: Vec<f32> = vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8];
let ids: Vec<u64> = vec![1, 2];
let inserted = coll
.upsert_bulk_from_raw(&vectors, &ids, 4, None)
.expect("upsert_bulk_from_raw without payloads should succeed");
assert_eq!(inserted, 2);
assert_eq!(coll.len(), 2);
let results = coll.get(&[1, 2]);
let p1 = results[0].as_ref().expect("id=1 should exist");
assert_eq!(p1.vector, vec![0.1, 0.2, 0.3, 0.4]);
assert!(p1.payload.is_none());
}
#[test]
fn test_upsert_bulk_from_raw_dimension_mismatch() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let vectors: Vec<f32> = vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6];
let ids: Vec<u64> = vec![1, 2];
let result = coll.upsert_bulk_from_raw(&vectors, &ids, 3, None);
assert!(result.is_err(), "should fail on dimension mismatch");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("VELES-004"),
"should be DimensionMismatch error: {err_msg}"
);
}
#[test]
fn test_upsert_bulk_from_raw_vector_length_mismatch() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let vectors: Vec<f32> = vec![0.1, 0.2, 0.3, 0.4, 0.5];
let ids: Vec<u64> = vec![1, 2];
let result = coll.upsert_bulk_from_raw(&vectors, &ids, 4, None);
assert!(result.is_err(), "should fail on vector length mismatch");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("VELES-005"),
"should be InvalidVector error: {err_msg}"
);
}
#[test]
fn test_upsert_bulk_from_raw_payload_length_mismatch() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let vectors: Vec<f32> = vec![0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8];
let ids: Vec<u64> = vec![1, 2];
let payloads = vec![Some(serde_json::json!({"x": 1}))];
let result = coll.upsert_bulk_from_raw(&vectors, &ids, 4, Some(&payloads));
assert!(result.is_err(), "should fail on payload length mismatch");
}
#[test]
fn test_upsert_bulk_from_raw_empty() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let inserted = coll
.upsert_bulk_from_raw(&[], &[], 4, None)
.expect("empty call should succeed");
assert_eq!(inserted, 0);
assert_eq!(coll.len(), 0);
}
#[test]
fn test_upsert_bulk_from_raw_searchable() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)] let vectors: Vec<f32> = (0u64..50)
.flat_map(|i| {
let base = i as f32 * 0.02;
vec![base, base + 0.01, base + 0.02, base + 0.03]
})
.collect();
let ids: Vec<u64> = (0..50).collect();
coll.upsert_bulk_from_raw(&vectors, &ids, 4, None)
.expect("bulk insert should succeed");
assert_eq!(coll.len(), 50);
let query = vec![0.0_f32, 0.01, 0.02, 0.03];
let results = coll.search(&query, 5).expect("search should succeed");
assert_eq!(results.len(), 5, "search should return k=5 results");
assert_eq!(results[0].point.id, 0, "nearest neighbor should be point 0");
}
#[test]
fn test_upsert_bulk_from_raw_persistence_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().to_path_buf();
{
let coll = Collection::create(path.clone(), 4, DistanceMetric::Cosine).unwrap();
let vectors: Vec<f32> = vec![1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0];
let ids: Vec<u64> = vec![100, 200];
let payloads = vec![
Some(serde_json::json!({"key": "first"})),
Some(serde_json::json!({"key": "second"})),
];
coll.upsert_bulk_from_raw(&vectors, &ids, 4, Some(&payloads))
.expect("insert should succeed");
coll.flush().expect("flush should succeed");
}
let coll2 = Collection::open(path).unwrap();
assert_eq!(coll2.len(), 2);
let results = coll2.get(&[100, 200]);
let p100 = results[0].as_ref().expect("id=100 should survive reopen");
assert_eq!(p100.vector, vec![1.0, 0.0, 0.0, 0.0]);
assert_eq!(p100.payload, Some(serde_json::json!({"key": "first"})));
let p200 = results[1].as_ref().expect("id=200 should survive reopen");
assert_eq!(p200.vector, vec![0.0, 1.0, 0.0, 0.0]);
assert_eq!(p200.payload, Some(serde_json::json!({"key": "second"})));
}
#[test]
fn test_upsert_bulk_from_raw_parity_with_upsert_bulk() {
let dim = 8;
let n = 100;
#[allow(clippy::cast_precision_loss)] let flat_vectors: Vec<f32> = (0u64..n)
.flat_map(|i| (0..dim).map(move |d| (i as f32 + d as f32) * 0.01))
.collect();
let id_list: Vec<u64> = (0..n).collect();
let payloads: Vec<Option<serde_json::Value>> = (0u64..n)
.map(|i| Some(serde_json::json!({"idx": i})))
.collect();
let dir_a = tempfile::tempdir().unwrap();
let coll_a =
Collection::create(dir_a.path().to_path_buf(), dim, DistanceMetric::Cosine).unwrap();
coll_a
.upsert_bulk_from_raw(&flat_vectors, &id_list, dim, Some(&payloads))
.expect("raw path should succeed");
let dir_b = tempfile::tempdir().unwrap();
let coll_b =
Collection::create(dir_b.path().to_path_buf(), dim, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)]
let points: Vec<Point> = (0u64..n)
.map(|i| {
let v: Vec<f32> = (0..dim).map(|d| (i as f32 + d as f32) * 0.01).collect();
Point::new(i, v, Some(serde_json::json!({"idx": i})))
})
.collect();
coll_b
.upsert_bulk(&points)
.expect("point path should succeed");
assert_eq!(coll_a.len(), coll_b.len());
let all_ids: Vec<u64> = (0..n).collect();
let results_a = coll_a.get(&all_ids);
let results_b = coll_b.get(&all_ids);
for i in 0..usize::try_from(n).expect("n fits in usize") {
let pa = results_a[i]
.as_ref()
.unwrap_or_else(|| panic!("raw: point {i} missing"));
let pb = results_b[i]
.as_ref()
.unwrap_or_else(|| panic!("bulk: point {i} missing"));
assert_eq!(pa.vector, pb.vector, "vector mismatch at point {i}");
assert_eq!(pa.payload, pb.payload, "payload mismatch at point {i}");
}
}
#[test]
fn test_phase2_runs_when_secondary_indexes_exist() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
coll.create_index("category").unwrap();
let points = vec![
Point::new(
1,
vec![1.0, 0.0, 0.0, 0.0],
Some(serde_json::json!({"category": "books"})),
),
Point::new(
2,
vec![0.0, 1.0, 0.0, 0.0],
Some(serde_json::json!({"category": "movies"})),
),
];
coll.upsert(points).unwrap();
let indexes = coll.secondary_indexes.read();
let cat_index = indexes.get("category").expect("index should exist");
match cat_index {
crate::index::SecondaryIndex::BTree(tree) => {
let tree = tree.read();
assert!(
!tree.is_empty(),
"secondary index should contain entries after upsert"
);
}
}
}
#[test]
fn test_phase2_fast_path_correctness_no_secondaries() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
#[allow(clippy::cast_precision_loss)]
let points: Vec<Point> = (0u64..100)
.map(|i| {
let v: Vec<f32> = (0..4).map(|d| (i as f32 + d as f32) * 0.01).collect();
Point::without_payload(i, v)
})
.collect();
coll.upsert(points).unwrap();
assert_eq!(coll.len(), 100, "all points should be stored");
let results = coll.search(&[0.5, 0.5, 0.5, 0.5], 10).unwrap();
assert_eq!(results.len(), 10, "search should return k results");
}
#[test]
fn test_phase2_does_not_skip_with_sparse_vectors() {
use crate::index::sparse::SparseVector;
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let mut sv_map = BTreeMap::new();
sv_map.insert(String::new(), SparseVector::new(vec![(1, 1.0), (2, 0.5)]));
let point = Point::with_sparse(1, vec![0.1, 0.2, 0.3, 0.4], None, Some(sv_map));
coll.upsert(vec![point]).unwrap();
let indexes = coll.sparse_indexes().read();
assert!(
indexes.contains_key(""),
"sparse index should be populated despite no payloads"
);
assert_eq!(indexes.get("").unwrap().doc_count(), 1);
}
#[test]
fn test_bulk_bm25_skip_does_not_lose_text() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let points = vec![
Point::new(
1,
vec![1.0, 0.0, 0.0, 0.0],
Some(serde_json::json!({"text": "hello world"})),
),
Point::without_payload(2, vec![0.0, 1.0, 0.0, 0.0]),
];
coll.upsert_bulk(&points).unwrap();
assert!(
!coll.text_index.is_empty(),
"BM25 index should contain the document from bulk insert"
);
}
#[test]
fn test_dedup_map_consolidation_correctness() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create(dir.path().to_path_buf(), 4, DistanceMetric::Cosine).unwrap();
let batch = vec![
Point::new(
1,
vec![1.0, 0.0, 0.0, 0.0],
Some(serde_json::json!({"v": "first"})),
),
Point::new(
1,
vec![0.0, 1.0, 0.0, 0.0],
Some(serde_json::json!({"v": "second"})),
),
Point::new(
2,
vec![0.0, 0.0, 1.0, 0.0],
Some(serde_json::json!({"v": "only"})),
),
];
coll.upsert(batch).unwrap();
let results = coll.get(&[1, 2]);
let p1 = results[0].as_ref().expect("id=1 should exist");
assert_eq!(
p1.payload,
Some(serde_json::json!({"v": "second"})),
"shared dedup map should preserve last-writer-wins for payload"
);
assert_eq!(
p1.vector,
vec![0.0, 1.0, 0.0, 0.0],
"shared dedup map should preserve last-writer-wins for vector"
);
let p2 = results[1].as_ref().expect("id=2 should exist");
assert_eq!(p2.payload, Some(serde_json::json!({"v": "only"})));
assert_eq!(coll.len(), 2, "should have 2 unique points");
}
#[test]
fn test_phase2_runs_for_sq8_storage_mode() {
let dir = tempfile::tempdir().unwrap();
let coll = Collection::create_with_options(
dir.path().to_path_buf(),
4,
DistanceMetric::Cosine,
StorageMode::SQ8,
)
.unwrap();
let points = vec![Point::without_payload(1, vec![1.0, 0.0, 0.0, 0.0])];
coll.upsert(points).unwrap();
assert_eq!(
coll.sq8_cache.read().len(),
1,
"SQ8 cache should be populated — Phase 2 must not skip"
);
}