#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::time::Duration;
use quiver_server::{Config, RateLimitConfig, serve};
use tokio::net::TcpListener;
async fn wait_ready(http: &reqwest::Client, base: &str) {
for _ in 0..200 {
if let Ok(resp) = http.get(format!("{base}/healthz")).send().await
&& resp.status().is_success()
{
return;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
panic!("server did not become ready");
}
#[tokio::test]
async fn rest_rate_limit_admits_a_burst_then_returns_429() {
let tmp = tempfile::tempdir().unwrap();
let rest_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let grpc_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let rest_addr = rest_listener.local_addr().unwrap();
let grpc_addr = grpc_listener.local_addr().unwrap();
let config = Config {
data_dir: tmp.path().to_path_buf(),
rest_addr,
grpc_addr,
insecure: true,
rate_limit: RateLimitConfig {
requests_per_second: 1,
burst: 2,
},
..Default::default()
};
let server = tokio::spawn(async move {
let _ = serve(config, rest_listener, grpc_listener).await;
});
let http = reqwest::Client::new();
let base = format!("http://{rest_addr}");
wait_ready(&http, &base).await;
let list = format!("{base}/v1/collections");
for _ in 0..2 {
let resp = http.get(&list).send().await.unwrap();
assert!(resp.status().is_success());
assert_eq!(resp.headers()["RateLimit-Limit"], "2");
assert!(resp.headers().contains_key("RateLimit-Remaining"));
assert!(resp.headers().contains_key("RateLimit-Reset"));
}
let limited = http.get(&list).send().await.unwrap();
assert_eq!(limited.status(), 429);
assert!(limited.headers().contains_key("Retry-After"));
assert_eq!(limited.headers()["RateLimit-Remaining"], "0");
for _ in 0..5 {
assert!(
http.get(format!("{base}/healthz"))
.send()
.await
.unwrap()
.status()
.is_success()
);
}
server.abort();
}
#[tokio::test]
async fn no_rate_limit_headers_when_disabled() {
let tmp = tempfile::tempdir().unwrap();
let rest_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let grpc_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let rest_addr = rest_listener.local_addr().unwrap();
let grpc_addr = grpc_listener.local_addr().unwrap();
let config = Config {
data_dir: tmp.path().to_path_buf(),
rest_addr,
grpc_addr,
insecure: true,
..Default::default()
};
let server = tokio::spawn(async move {
let _ = serve(config, rest_listener, grpc_listener).await;
});
let http = reqwest::Client::new();
let base = format!("http://{rest_addr}");
wait_ready(&http, &base).await;
for _ in 0..20 {
let resp = http
.get(format!("{base}/v1/collections"))
.send()
.await
.unwrap();
assert!(resp.status().is_success());
assert!(!resp.headers().contains_key("RateLimit-Limit"));
}
server.abort();
}