#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::time::Duration;
use quiver_server::{Action, ApiKey, CollectionScope, Config, serve};
use tokio::net::TcpListener;
async fn wait_ready(http: &reqwest::Client, base: &str) {
for _ in 0..200 {
if let Ok(resp) = http.get(format!("{base}/healthz")).send().await
&& resp.status().is_success()
{
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("coordinator did not become ready");
}
#[tokio::test]
async fn coordinator_membership_api_requires_auth() {
let admin = "admin-secret";
let reader = "reader-secret";
let rest_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let grpc_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let rest_addr = rest_listener.local_addr().unwrap();
let grpc_addr = grpc_listener.local_addr().unwrap();
let config = Config {
rest_addr,
grpc_addr,
coordinator: true,
cluster_shards: vec!["http://127.0.0.1:1".into(), "http://127.0.0.1:2".into()],
api_keys: vec![
ApiKey::admin(admin),
ApiKey {
secret: reader.to_owned(),
role: Action::Read,
collections: CollectionScope::All,
id: None,
},
],
..Default::default()
};
let server = tokio::spawn(async move {
let _ = serve(config, rest_listener, grpc_listener).await;
});
let http = reqwest::Client::new();
let base = format!("http://{rest_addr}");
wait_ready(&http, &base).await;
let h = http.get(format!("{base}/healthz")).send().await.unwrap();
assert_eq!(
h.status(),
reqwest::StatusCode::OK,
"healthz must stay open"
);
let add_body = serde_json::json!({"primary_url": "http://127.0.0.1:3", "replica_urls": []});
let no_key = http
.post(format!("{base}/cluster/shards"))
.json(&add_body)
.send()
.await
.unwrap();
assert_eq!(
no_key.status(),
reqwest::StatusCode::UNAUTHORIZED,
"add_shard without a key must be 401"
);
let ro = http
.post(format!("{base}/cluster/shards"))
.bearer_auth(reader)
.json(&add_body)
.send()
.await
.unwrap();
assert_eq!(
ro.status(),
reqwest::StatusCode::FORBIDDEN,
"add_shard with a read-only key must be 403"
);
let ok = http
.post(format!("{base}/cluster/shards"))
.bearer_auth(admin)
.json(&add_body)
.send()
.await
.unwrap();
assert_eq!(
ok.status(),
reqwest::StatusCode::OK,
"add_shard with the admin key must succeed"
);
let map: serde_json::Value = ok.json().await.unwrap();
assert_eq!(map["shards"].as_array().unwrap().len(), 3);
let del_no_key = http
.delete(format!("{base}/cluster/shards/2"))
.send()
.await
.unwrap();
assert_eq!(
del_no_key.status(),
reqwest::StatusCode::UNAUTHORIZED,
"remove_shard without a key must be 401"
);
let map_no_key = http
.get(format!("{base}/cluster/map"))
.send()
.await
.unwrap();
assert_eq!(
map_no_key.status(),
reqwest::StatusCode::UNAUTHORIZED,
"/cluster/map without a key must be 401"
);
let map_ro = http
.get(format!("{base}/cluster/map"))
.bearer_auth(reader)
.send()
.await
.unwrap();
assert_eq!(
map_ro.status(),
reqwest::StatusCode::OK,
"/cluster/map with a read-only key must succeed"
);
server.abort();
}