#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::time::Duration;
use quiver_server::{Config, serve};
use serde_json::{Value, json};
use tokio::net::TcpListener;
async fn wait_ready(http: &reqwest::Client, base: &str) {
for _ in 0..300 {
if let Ok(r) = http.get(format!("{base}/healthz")).send().await
&& r.status().is_success()
{
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("server {base} did not become ready");
}
async fn boot(mut config: Config) -> String {
let rest = TcpListener::bind("127.0.0.1:0").await.unwrap();
let grpc = TcpListener::bind("127.0.0.1:0").await.unwrap();
config.rest_addr = rest.local_addr().unwrap();
config.grpc_addr = grpc.local_addr().unwrap();
config.insecure = true;
let base = format!("http://{}", config.rest_addr);
tokio::spawn(async move {
let _ = serve(config, rest, grpc).await;
});
base
}
fn vec_for(i: u32) -> Vec<f32> {
(0..8)
.map(|j| (((i * 7 + j * 13) % 91) as f32) / 9.0)
.collect()
}
async fn create(http: &reqwest::Client, base: &str) {
http.post(format!("{base}/v1/collections"))
.json(&json!({"name": "c", "dim": 8, "metric": "l2"}))
.send()
.await
.unwrap()
.error_for_status()
.unwrap();
}
async fn upsert_all(http: &reqwest::Client, base: &str, n: u32) {
let points: Vec<Value> = (0..n)
.map(|i| json!({"id": format!("p{i}"), "vector": vec_for(i), "payload": {"i": i}}))
.collect();
http.post(format!("{base}/v1/collections/c/points"))
.json(&json!({ "points": points }))
.send()
.await
.unwrap()
.error_for_status()
.unwrap();
}
async fn top_scores(http: &reqwest::Client, base: &str, q: &[f32], k: usize) -> Vec<f32> {
let resp: Value = http
.post(format!("{base}/v1/collections/c/query"))
.json(&json!({"vector": q, "k": k, "ef_search": 256, "with_payload": false, "with_vector": false}))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
resp["matches"]
.as_array()
.unwrap()
.iter()
.map(|m| m["score"].as_f64().unwrap() as f32)
.collect()
}
async fn count(http: &reqwest::Client, base: &str) -> u64 {
let resp: Value = http
.get(format!("{base}/v1/collections/c"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
resp["count"].as_u64().unwrap()
}
#[tokio::test]
async fn cluster_router_matches_single_node_ground_truth() {
let _tmp = (
tempfile::tempdir().unwrap(),
tempfile::tempdir().unwrap(),
tempfile::tempdir().unwrap(),
tempfile::tempdir().unwrap(),
);
let http = reqwest::Client::new();
let shard0 = boot(Config {
data_dir: _tmp.0.path().into(),
..Default::default()
})
.await;
let shard1 = boot(Config {
data_dir: _tmp.1.path().into(),
..Default::default()
})
.await;
wait_ready(&http, &shard0).await;
wait_ready(&http, &shard1).await;
let router = boot(Config {
data_dir: _tmp.2.path().into(),
cluster_shards: vec![shard0.clone(), shard1.clone()],
..Default::default()
})
.await;
let baseline = boot(Config {
data_dir: _tmp.3.path().into(),
..Default::default()
})
.await;
wait_ready(&http, &router).await;
wait_ready(&http, &baseline).await;
create(&http, &router).await;
create(&http, &baseline).await;
upsert_all(&http, &router, 120).await;
upsert_all(&http, &baseline, 120).await;
let (c0, c1) = (count(&http, &shard0).await, count(&http, &shard1).await);
assert!(
c0 > 0 && c1 > 0,
"data did not shard: shard0={c0} shard1={c1}"
);
assert_eq!(c0 + c1, 120, "points lost or duplicated across shards");
for qi in [0u32, 17, 63, 119] {
let q = vec_for(qi);
let got = top_scores(&http, &router, &q, 10).await;
let want = top_scores(&http, &baseline, &q, 10).await;
assert_eq!(
got.len(),
10,
"query {qi}: router returned {} hits",
got.len()
);
for (g, w) in got.iter().zip(&want) {
assert!(
(g - w).abs() < 1e-4,
"router distance {g} != single-node {w} for query {qi}"
);
}
}
let before = count(&http, &shard0).await + count(&http, &shard1).await;
let resp = http
.request(
reqwest::Method::DELETE,
format!("{router}/v1/collections/c/points"),
)
.json(&json!({"ids": ["p0", "p1", "p2"]}))
.send()
.await
.unwrap();
assert!(resp.status().is_success());
let after = count(&http, &shard0).await + count(&http, &shard1).await;
assert_eq!(
after,
before - 3,
"routed delete did not remove across shards"
);
}