#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::time::Duration;
use quiver_server::{Config, serve};
use serde_json::{Value, json};
use tokio::net::TcpListener;
async fn wait_ready(http: &reqwest::Client, base: &str) {
for _ in 0..300 {
if let Ok(r) = http.get(format!("{base}/healthz")).send().await
&& r.status().is_success()
{
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("server {base} did not become ready");
}
async fn boot(mut cfg: Config) -> String {
let rest = TcpListener::bind("127.0.0.1:0").await.unwrap();
let grpc = TcpListener::bind("127.0.0.1:0").await.unwrap();
cfg.rest_addr = rest.local_addr().unwrap();
cfg.grpc_addr = grpc.local_addr().unwrap();
cfg.insecure = true;
let base = format!("http://{}", cfg.rest_addr);
tokio::spawn(async move {
let _ = serve(cfg, rest, grpc).await;
});
base
}
fn vec_for(i: u32) -> Vec<f32> {
(0..8)
.map(|j| (((i * 7 + j * 13) % 91) as f32) / 9.0)
.collect()
}
async fn create(http: &reqwest::Client, base: &str) {
http.post(format!("{base}/v1/collections"))
.json(&json!({"name": "c", "dim": 8, "metric": "l2"}))
.send()
.await
.unwrap()
.error_for_status()
.unwrap();
}
async fn upsert_range(http: &reqwest::Client, base: &str, lo: u32, hi: u32) {
let points: Vec<Value> = (lo..hi)
.map(|i| json!({"id": format!("p{i}"), "vector": vec_for(i), "payload": {"i": i}}))
.collect();
http.post(format!("{base}/v1/collections/c/points"))
.json(&json!({ "points": points }))
.send()
.await
.unwrap()
.error_for_status()
.unwrap();
}
async fn count(http: &reqwest::Client, base: &str) -> u64 {
let resp: Value = http
.get(format!("{base}/v1/collections/c"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
resp["count"].as_u64().unwrap_or(0)
}
async fn top_scores(http: &reqwest::Client, base: &str, q: &[f32], k: usize) -> Vec<f32> {
let resp: Value = http
.post(format!("{base}/v1/collections/c/query"))
.json(&json!({"vector": q, "k": k, "ef_search": 256, "with_payload": false, "with_vector": false}))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
resp["matches"]
.as_array()
.map(|a| {
a.iter()
.map(|m| m["score"].as_f64().unwrap() as f32)
.collect()
})
.unwrap_or_default()
}
fn close(got: &[f32], want: &[f32]) -> bool {
got.len() == want.len() && got.iter().zip(want).all(|(g, w)| (g - w).abs() < 1e-4)
}
async fn router_map_version(http: &reqwest::Client, router: &str) -> u64 {
let resp: Value = http
.get(format!("{router}/cluster/map"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
resp["version"].as_u64().unwrap_or(0)
}
async fn wait_router_version(http: &reqwest::Client, router: &str, version: u64) {
for _ in 0..400 {
if router_map_version(http, router).await >= version {
return;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
panic!("router did not refresh to map version {version}");
}
#[tokio::test]
async fn router_refreshes_membership_from_the_coordinator() {
let dirs: Vec<_> = (0..6).map(|_| tempfile::tempdir().unwrap()).collect();
let state = tempfile::tempdir().unwrap();
let http = reqwest::Client::new();
let s0 = boot(Config {
data_dir: dirs[0].path().into(),
..Default::default()
})
.await;
let s1 = boot(Config {
data_dir: dirs[1].path().into(),
..Default::default()
})
.await;
let s2 = boot(Config {
data_dir: dirs[2].path().into(),
..Default::default()
})
.await;
let baseline = boot(Config {
data_dir: dirs[3].path().into(),
..Default::default()
})
.await;
for b in [&s0, &s1, &s2, &baseline] {
wait_ready(&http, b).await;
}
let coordinator = boot(Config {
data_dir: dirs[4].path().into(),
coordinator: true,
coordinator_state: Some(state.path().join("coord.json")),
autoscale: Default::default(),
raft_node_id: None,
raft_members: Vec::new(),
cluster_shards: vec![s0.clone(), s1.clone()],
..Default::default()
})
.await;
wait_ready(&http, &coordinator).await;
let router = boot(Config {
data_dir: dirs[5].path().into(),
cluster_shards: vec![s0.clone(), s1.clone()],
coordinator_url: Some(coordinator.clone()),
..Default::default()
})
.await;
wait_ready(&http, &router).await;
for b in [&s0, &s1, &s2, &baseline] {
create(&http, b).await;
}
upsert_range(&http, &router, 0, 90).await;
upsert_range(&http, &baseline, 0, 90).await;
assert_eq!(count(&http, &s2).await, 0, "s2 is not in the cluster yet");
assert_eq!(
router_map_version(&http, &router).await,
0,
"seed map is v0"
);
let added: Value = http
.post(format!("{coordinator}/cluster/shards"))
.json(&json!({ "primary_url": s2 }))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(added["version"].as_u64().unwrap(), 1);
assert_eq!(added["shards"].as_array().unwrap().len(), 3);
wait_router_version(&http, &router, 1).await;
upsert_range(&http, &router, 90, 180).await;
upsert_range(&http, &baseline, 90, 180).await;
assert!(
count(&http, &s2).await > 0,
"router did not route to the added shard after refresh"
);
let health: Value = http
.get(format!("{coordinator}/cluster/health"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
for id in ["0", "1", "2"] {
assert_eq!(health[id], json!(true), "shard {id} should be healthy");
}
let bad = http
.request(
reqwest::Method::DELETE,
format!("{coordinator}/cluster/shards/99"),
)
.send()
.await
.unwrap();
assert_eq!(bad.status(), reqwest::StatusCode::BAD_REQUEST);
for qi in [5u32, 73, 150] {
let q = vec_for(qi);
let got = top_scores(&http, &router, &q, 10).await;
let want = top_scores(&http, &baseline, &q, 10).await;
assert!(close(&got, &want), "router != baseline after add for q{qi}");
}
let removed: Value = http
.request(
reqwest::Method::DELETE,
format!("{coordinator}/cluster/shards/1"),
)
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(removed["version"].as_u64().unwrap(), 2);
assert_eq!(removed["shards"].as_array().unwrap().len(), 2);
wait_router_version(&http, &router, 2).await;
let s1_before = count(&http, &s1).await;
upsert_range(&http, &router, 180, 240).await;
assert_eq!(
count(&http, &s1).await,
s1_before,
"router still routed to the removed shard"
);
}
#[tokio::test]
async fn coordinator_persists_membership_across_restart() {
let dir = tempfile::tempdir().unwrap();
let state_path = dir.path().join("coord.json");
let http = reqwest::Client::new();
let s0 = boot(Config {
data_dir: dir.path().join("s0"),
..Default::default()
})
.await;
let s1 = boot(Config {
data_dir: dir.path().join("s1"),
..Default::default()
})
.await;
wait_ready(&http, &s0).await;
wait_ready(&http, &s1).await;
let coord1 = boot(Config {
data_dir: dir.path().join("c1"),
coordinator: true,
coordinator_state: Some(state_path.clone()),
autoscale: Default::default(),
raft_node_id: None,
raft_members: Vec::new(),
cluster_shards: vec![s0.clone()],
..Default::default()
})
.await;
wait_ready(&http, &coord1).await;
http.post(format!("{coord1}/cluster/shards"))
.json(&json!({ "primary_url": s1 }))
.send()
.await
.unwrap()
.error_for_status()
.unwrap();
let coord2 = boot(Config {
data_dir: dir.path().join("c2"),
coordinator: true,
coordinator_state: Some(state_path.clone()),
autoscale: Default::default(),
raft_node_id: None,
raft_members: Vec::new(),
cluster_shards: vec!["http://ignored:6333".into()],
..Default::default()
})
.await;
wait_ready(&http, &coord2).await;
let recovered: Value = http
.get(format!("{coord2}/cluster/map"))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
assert_eq!(
recovered["version"].as_u64().unwrap(),
1,
"recovered version"
);
let ids: Vec<u64> = recovered["shards"]
.as_array()
.unwrap()
.iter()
.map(|s| s["id"].as_u64().unwrap())
.collect();
assert_eq!(ids, [0, 1], "recovered both shards with their ids");
let next: Value = http
.post(format!("{coord2}/cluster/shards"))
.json(&json!({ "primary_url": s0 }))
.send()
.await
.unwrap()
.json()
.await
.unwrap();
let new_id = next["shards"].as_array().unwrap().last().unwrap()["id"]
.as_u64()
.unwrap();
assert_eq!(new_id, 2, "id counter persisted across restart");
}