#![allow(clippy::result_large_err)]
use std::time::{Duration, Instant};
use reqwest::StatusCode;
use serde_json::Value;
use crate::apis::configuration::Configuration;
use crate::http::{backoff_delay, parse_retry_after};
use crate::apis::query_api::QueryError as GeneratedQueryError;
use crate::apis::results_api::GetResultError;
use crate::apis::{query_runs_api, results_api, Error, ResponseContent};
use crate::client::{SESSION_ID_HEADER, WORKSPACE_ID_HEADER};
use crate::models::{AsyncQueryResponse, QueryRequest, QueryResponse, ResultsFormatQuery};
use crate::status::ResultStatus;
const HTTP_TOO_MANY_REQUESTS: StatusCode = StatusCode::TOO_MANY_REQUESTS;
pub const OVERLOADED_ERROR_CODE: &str = "OVERLOADED";
pub const DEFAULT_MAX_AUTO_ROWS: u64 = 1_000_000;
pub const DEFAULT_MAX_AUTO_BYTES: u64 = 64 * 1024 * 1024;
#[derive(Debug, Clone, Copy)]
pub struct RetryPolicy {
pub max_retries: u32,
pub base_backoff: Duration,
pub max_backoff: Duration,
pub deadline: Duration,
pub jitter: f64,
}
impl Default for RetryPolicy {
fn default() -> Self {
RetryPolicy {
max_retries: 5,
base_backoff: Duration::from_millis(500),
max_backoff: Duration::from_secs(30),
deadline: Duration::from_secs(120),
jitter: 0.5,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct PollPolicy {
pub base_backoff: Duration,
pub max_backoff: Duration,
pub deadline: Duration,
pub page_size: i32,
}
impl Default for PollPolicy {
fn default() -> Self {
PollPolicy {
base_backoff: Duration::from_millis(500),
max_backoff: Duration::from_secs(5),
deadline: Duration::from_secs(120),
page_size: 50_000,
}
}
}
#[derive(Debug, Clone)]
pub struct QueryConfig {
pub retry: RetryPolicy,
pub poll: PollPolicy,
pub auto_follow: bool,
pub max_auto_rows: Option<u64>,
pub max_auto_bytes: Option<u64>,
}
impl Default for QueryConfig {
fn default() -> Self {
QueryConfig {
retry: RetryPolicy::default(),
poll: PollPolicy::default(),
auto_follow: true,
max_auto_rows: Some(DEFAULT_MAX_AUTO_ROWS),
max_auto_bytes: Some(DEFAULT_MAX_AUTO_BYTES),
}
}
}
impl QueryConfig {
#[must_use]
pub fn with_auto_follow(mut self, auto_follow: bool) -> Self {
self.auto_follow = auto_follow;
self
}
#[must_use]
pub fn with_max_auto_rows(mut self, max_auto_rows: Option<u64>) -> Self {
self.max_auto_rows = max_auto_rows;
self
}
#[must_use]
pub fn with_max_auto_bytes(mut self, max_auto_bytes: Option<u64>) -> Self {
self.max_auto_bytes = max_auto_bytes;
self
}
#[must_use]
pub fn with_retry(mut self, retry: RetryPolicy) -> Self {
self.retry = retry;
self
}
#[must_use]
pub fn with_poll(mut self, poll: PollPolicy) -> Self {
self.poll = poll;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TooLargeKind {
Rows,
Bytes,
}
impl TooLargeKind {
fn knob(self) -> &'static str {
match self {
TooLargeKind::Rows => "max_auto_rows",
TooLargeKind::Bytes => "max_auto_bytes",
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum ResultError {
Failed {
result_id: String,
error_message: Option<String>,
},
Timeout {
result_id: String,
status: String,
deadline: Duration,
},
TooLarge {
result_id: String,
kind: TooLargeKind,
observed: u64,
limit: u64,
},
Incomplete {
result_id: String,
fetched: i64,
expected: i64,
},
Unavailable {
warning: Option<String>,
},
}
impl std::fmt::Display for ResultError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ResultError::Failed {
result_id,
error_message,
} => match error_message {
Some(msg) => write!(f, "result {result_id} failed: {msg}"),
None => write!(f, "result {result_id} failed"),
},
ResultError::Timeout {
result_id,
status,
deadline,
} => write!(
f,
"result {result_id} did not become ready within {deadline:?} \
(last status: {status})"
),
ResultError::TooLarge {
result_id,
kind,
observed,
limit,
} => {
let desc = match kind {
TooLargeKind::Bytes => format!("~{observed} bytes (limit {limit})"),
TooLargeKind::Rows => format!("{observed} rows (limit {limit})"),
};
write!(
f,
"result {result_id} exceeds the auto-materialize limit: {desc}. \
Stream it with Client::stream_result_arrow, or raise (or set to \
None) {}.",
kind.knob()
)
}
ResultError::Incomplete {
result_id,
fetched,
expected,
} => write!(
f,
"result {result_id} pagination stalled: fetched {fetched} of {expected} \
rows before the server returned an empty page"
),
ResultError::Unavailable { warning } => {
write!(
f,
"query result is truncated but no result_id is available to fetch \
the full result"
)?;
if let Some(w) = warning {
write!(f, ": {w}")?;
}
write!(f, ". Re-run with auto_follow disabled to use the preview.")
}
}
}
}
impl std::error::Error for ResultError {}
#[derive(Debug)]
#[non_exhaustive]
pub enum QueryError {
Overloaded {
attempts: u32,
source: Box<Error<GeneratedQueryError>>,
},
Submit(Error<GeneratedQueryError>),
AsyncRequested,
Async(Box<AsyncQueryResponse>),
Poll(Error<GetResultError>),
Result(ResultError),
}
impl From<ResultError> for QueryError {
fn from(e: ResultError) -> Self {
QueryError::Result(e)
}
}
impl std::fmt::Display for QueryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryError::Overloaded { attempts, .. } => write!(
f,
"server overloaded: HTTP 429 OVERLOADED, gave up after {attempts} attempt(s)"
),
QueryError::Submit(e) => write!(f, "query submission failed: {e}"),
QueryError::AsyncRequested => write!(
f,
"query() is the synchronous-results path, but this request set \
async=true; use submit_query() to drive an async submission"
),
QueryError::Async(resp) => write!(
f,
"query was submitted asynchronously (query_run_id={}); use submit_query \
to drive an async submission",
resp.query_run_id
),
QueryError::Poll(e) => write!(f, "failed to follow truncated result: {e}"),
QueryError::Result(e) => write!(f, "{e}"),
}
}
}
impl std::error::Error for QueryError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
QueryError::Overloaded { source, .. } => Some(source.as_ref()),
QueryError::Submit(e) => Some(e),
QueryError::Poll(e) => Some(e),
QueryError::Result(e) => Some(e),
QueryError::Async(_) | QueryError::AsyncRequested => None,
}
}
}
enum Submission {
Inline(QueryResponse),
Async(AsyncQueryResponse),
}
struct RawResponse {
status: StatusCode,
retry_after: Option<Duration>,
body: String,
}
pub(crate) async fn execute_query(
config: &Configuration,
request: QueryRequest,
x_database_id: Option<&str>,
qc: &QueryConfig,
) -> Result<QueryResponse, QueryError> {
if request.r#async == Some(true) {
return Err(QueryError::AsyncRequested);
}
match submit_with_retry(config, &request, x_database_id, &qc.retry).await? {
Submission::Async(async_resp) => Err(QueryError::Async(Box::new(async_resp))),
Submission::Inline(resp) => {
if !qc.auto_follow || !resp.truncated {
Ok(resp)
} else {
materialize_full(config, resp, qc).await
}
}
}
}
async fn submit_with_retry(
config: &Configuration,
request: &QueryRequest,
x_database_id: Option<&str>,
retry: &RetryPolicy,
) -> Result<Submission, QueryError> {
let start = Instant::now();
let mut attempt: u32 = 0;
loop {
attempt += 1;
let raw = send_query(config, request, x_database_id)
.await
.map_err(QueryError::Submit)?;
if raw.status != HTTP_TOO_MANY_REQUESTS {
return interpret_response(raw);
}
if attempt > retry.max_retries {
return Err(overloaded(attempt, raw));
}
let delay = backoff_delay(retry, attempt, raw.retry_after);
if start.elapsed() + delay > retry.deadline {
return Err(overloaded(attempt, raw));
}
tokio::time::sleep(delay).await;
}
}
fn interpret_response(raw: RawResponse) -> Result<Submission, QueryError> {
if raw.status == StatusCode::ACCEPTED {
let async_resp: AsyncQueryResponse =
serde_json::from_str(&raw.body).map_err(|e| QueryError::Submit(Error::from(e)))?;
return Ok(Submission::Async(async_resp));
}
if raw.status.is_success() {
let resp: QueryResponse =
serde_json::from_str(&raw.body).map_err(|e| QueryError::Submit(Error::from(e)))?;
return Ok(Submission::Inline(resp));
}
let entity: Option<GeneratedQueryError> = serde_json::from_str(&raw.body).ok();
Err(QueryError::Submit(Error::ResponseError(ResponseContent {
status: raw.status,
content: raw.body,
entity,
})))
}
fn overloaded(attempts: u32, raw: RawResponse) -> QueryError {
let entity: Option<GeneratedQueryError> = serde_json::from_str(&raw.body).ok();
QueryError::Overloaded {
attempts,
source: Box::new(Error::ResponseError(ResponseContent {
status: raw.status,
content: raw.body,
entity,
})),
}
}
async fn send_query(
config: &Configuration,
request: &QueryRequest,
x_database_id: Option<&str>,
) -> Result<RawResponse, Error<GeneratedQueryError>> {
let uri_str = format!("{}/v1/query", config.base_path);
let mut req_builder = config.client.request(reqwest::Method::POST, &uri_str);
if let Some(ref user_agent) = config.user_agent {
req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone());
}
if let Some(param_value) = x_database_id {
req_builder = req_builder.header("X-Database-Id", param_value.to_string());
}
req_builder = apply_apikey_headers(req_builder, config);
if let Some(token) = config.resolve_bearer_token().await {
req_builder = req_builder.bearer_auth(token);
}
req_builder = req_builder.json(request);
let req = req_builder.build()?;
crate::http_log::log_request(&req);
let resp = config.client.execute(req).await?;
let status = resp.status();
crate::http_log::log_response_status(status);
let retry_after = parse_retry_after(&resp);
let body = resp.text().await?;
crate::http_log::log_response_body(&body);
Ok(RawResponse {
status,
retry_after,
body,
})
}
fn apply_apikey_headers(
mut req_builder: reqwest::RequestBuilder,
config: &Configuration,
) -> reqwest::RequestBuilder {
for header in [WORKSPACE_ID_HEADER, SESSION_ID_HEADER] {
if let Some(apikey) = config.api_keys.get(header) {
let key = apikey.key.clone();
let value = match apikey.prefix {
Some(ref prefix) => format!("{prefix} {key}"),
None => key,
};
req_builder = req_builder.header(header, value);
}
}
req_builder
}
pub(crate) async fn wait_for_result(
config: &Configuration,
result_id: &str,
poll: &PollPolicy,
) -> Result<crate::models::GetResultResponse, QueryError> {
let start = Instant::now();
let mut delay = poll.base_backoff;
loop {
let result = match results_api::get_result(config, result_id, None, Some(0), None).await {
Ok(result) => result,
Err(Error::ResponseError(rc)) if rc.status == StatusCode::CONFLICT => {
let error_message = match rc.entity {
Some(GetResultError::Status409(r)) => r.error_message.flatten(),
_ => None,
};
return Err(ResultError::Failed {
result_id: result_id.to_owned(),
error_message,
}
.into());
}
Err(e) => return Err(QueryError::Poll(e)),
};
match ResultStatus::parse(&result.status) {
ResultStatus::Ready => return Ok(result),
ResultStatus::Failed => {
return Err(ResultError::Failed {
result_id: result_id.to_owned(),
error_message: result.error_message.flatten(),
}
.into())
}
_ => {}
}
if start.elapsed() + delay > poll.deadline {
return Err(ResultError::Timeout {
result_id: result_id.to_owned(),
status: result.status,
deadline: poll.deadline,
}
.into());
}
tokio::time::sleep(delay).await;
delay = (delay * 2).min(poll.max_backoff);
}
}
async fn materialize_full(
config: &Configuration,
mut preview: QueryResponse,
qc: &QueryConfig,
) -> Result<QueryResponse, QueryError> {
let result_id = match preview.result_id.clone().flatten() {
Some(id) => id,
None => {
return Err(ResultError::Unavailable {
warning: preview.warning.clone().flatten(),
}
.into())
}
};
wait_for_result(config, &result_id, &qc.poll).await?;
let total = authoritative_total(config, &preview).await;
log::info!(
target: "hotdata::query",
"auto-following truncated result {} ({} rows) for query run {}",
result_id,
total.map(|t| t.to_string()).unwrap_or_else(|| "unknown".to_owned()),
preview.query_run_id,
);
if let (Some(max), Some(total)) = (qc.max_auto_rows, total) {
if total > 0 && total as u64 > max {
return Err(ResultError::TooLarge {
result_id,
kind: TooLargeKind::Rows,
observed: total as u64,
limit: max,
}
.into());
}
}
let rows = fetch_all_rows(config, &result_id, total, qc).await?;
preview.rows = rows;
if preview.total_row_count.flatten().is_none() {
if let Some(t) = total {
preview.total_row_count = Some(Some(t));
}
}
Ok(preview)
}
async fn authoritative_total(config: &Configuration, preview: &QueryResponse) -> Option<i64> {
if let Some(t) = preview.total_row_count.flatten() {
return Some(t);
}
match query_runs_api::get_query_run(config, &preview.query_run_id).await {
Ok(run) => run.row_count.flatten(),
Err(_) => None,
}
}
async fn fetch_all_rows(
config: &Configuration,
result_id: &str,
total: Option<i64>,
qc: &QueryConfig,
) -> Result<Vec<Vec<Value>>, QueryError> {
let page_size = effective_page_size(qc.poll.page_size);
let mut rows: Vec<Vec<Value>> = Vec::new();
let mut byte_estimate: u64 = 0;
let mut offset: i64 = 0;
loop {
let page = results_api::get_result(
config,
result_id,
Some(checked_offset(offset, result_id)?),
Some(page_size),
Some(ResultsFormatQuery::Json),
)
.await
.map_err(QueryError::Poll)?;
let batch = page.rows.flatten().unwrap_or_default();
let batch_len = batch.len();
if let Some(total) = total {
if batch.is_empty() && offset < total {
return Err(ResultError::Incomplete {
result_id: result_id.to_owned(),
fetched: offset,
expected: total,
}
.into());
}
}
if qc.max_auto_bytes.is_some() {
byte_estimate += estimate_rows_bytes(&batch);
}
rows.extend(batch);
if let Some(max) = qc.max_auto_rows {
if rows.len() as u64 > max {
return Err(ResultError::TooLarge {
result_id: result_id.to_owned(),
kind: TooLargeKind::Rows,
observed: rows.len() as u64,
limit: max,
}
.into());
}
}
if let Some(max) = qc.max_auto_bytes {
if byte_estimate > max {
return Err(ResultError::TooLarge {
result_id: result_id.to_owned(),
kind: TooLargeKind::Bytes,
observed: byte_estimate,
limit: max,
}
.into());
}
}
offset += batch_len as i64;
if let Some(total) = total {
if offset >= total {
break;
}
continue;
}
if batch_len < page_size as usize {
break;
}
}
Ok(rows)
}
fn effective_page_size(page_size: i32) -> i32 {
page_size.max(1)
}
fn checked_offset(offset: i64, result_id: &str) -> Result<i32, ResultError> {
i32::try_from(offset).map_err(|_| ResultError::TooLarge {
result_id: result_id.to_owned(),
kind: TooLargeKind::Rows,
observed: offset.max(0) as u64,
limit: i32::MAX as u64,
})
}
fn estimate_rows_bytes(batch: &[Vec<Value>]) -> u64 {
let mut total: u64 = 0;
for row in batch {
for cell in row {
total += cell_len(cell) + 2;
}
total += 2;
}
total
}
fn cell_len(v: &Value) -> u64 {
match v {
Value::Null => 4, Value::String(s) => s.chars().count() as u64,
other => other.to_string().len() as u64,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::apis::configuration::ApiKey;
use crate::client::Client;
use serde_json::json;
use wiremock::matchers::{method, path, query_param};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn test_client(base_url: &str, qc: QueryConfig) -> Client {
let mut configuration = Configuration {
base_path: base_url.to_owned(),
user_agent: Some("hotdata-rust-test".to_owned()),
bearer_access_token: Some("test-bearer".to_owned()),
..Configuration::default()
};
configuration.api_keys.insert(
WORKSPACE_ID_HEADER.to_owned(),
ApiKey {
prefix: None,
key: "ws_test".to_owned(),
},
);
Client::from_configuration(configuration).with_query_config(qc)
}
fn fast_config() -> QueryConfig {
QueryConfig {
retry: RetryPolicy {
max_retries: 5,
base_backoff: Duration::from_millis(1),
max_backoff: Duration::from_millis(5),
deadline: Duration::from_secs(30),
jitter: 0.0,
},
poll: PollPolicy {
base_backoff: Duration::from_millis(1),
max_backoff: Duration::from_millis(5),
deadline: Duration::from_secs(30),
page_size: 2,
},
..QueryConfig::default()
}
}
fn preview_json(truncated: bool, result_id: Option<&str>, total: Option<i64>) -> Value {
let mut body = json!({
"columns": ["x"],
"execution_time_ms": 1,
"nullable": [false],
"preview_row_count": 1,
"query_run_id": "qrun1",
"row_count": 1,
"rows": [[1]],
"truncated": truncated,
});
if let Some(id) = result_id {
body["result_id"] = json!(id);
}
if let Some(t) = total {
body["total_row_count"] = json!(t);
}
body
}
fn req() -> QueryRequest {
QueryRequest::new("SELECT 1 AS x".to_owned())
}
#[tokio::test]
async fn passthrough_when_not_truncated() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
false,
Some("rslt1"),
None,
)))
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let resp = client.query(req()).await.expect("query should succeed");
assert_eq!(resp.query_run_id, "qrun1");
assert!(!resp.truncated);
}
#[tokio::test]
async fn non_429_error_propagates_as_submit() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(400).set_body_json(json!({
"error": {"code": "BAD_REQUEST", "message": "nope"}
})))
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let err = client.query(req()).await.expect_err("should error");
match err {
QueryError::Submit(Error::ResponseError(rc)) => {
assert_eq!(rc.status, StatusCode::BAD_REQUEST);
}
other => panic!("expected Submit ResponseError, got {other:?}"),
}
}
#[tokio::test]
async fn retry_then_succeed_honoring_retry_after() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(
ResponseTemplate::new(429)
.insert_header("Retry-After", "0")
.set_body_json(json!({"error": {"code": "OVERLOADED"}})),
)
.up_to_n_times(2)
.mount(&server)
.await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
false,
Some("rslt1"),
None,
)))
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let resp = client
.query(req())
.await
.expect("should succeed after retries");
assert_eq!(resp.query_run_id, "qrun1");
}
#[tokio::test]
async fn retries_exhausted_by_max_retries() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(
ResponseTemplate::new(429).set_body_json(json!({"error": {"code": "OVERLOADED"}})),
)
.mount(&server)
.await;
let qc = QueryConfig {
retry: RetryPolicy {
max_retries: 2,
base_backoff: Duration::from_millis(1),
max_backoff: Duration::from_millis(2),
deadline: Duration::from_secs(30),
jitter: 0.0,
},
..fast_config()
};
let client = test_client(&server.uri(), qc);
let err = client
.query(req())
.await
.expect_err("should exhaust retries");
match err {
QueryError::Overloaded { attempts, .. } => assert_eq!(attempts, 3),
other => panic!("expected Overloaded, got {other:?}"),
}
}
#[tokio::test]
async fn retries_exhausted_by_deadline() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(
ResponseTemplate::new(429)
.insert_header("Retry-After", "100")
.set_body_json(json!({"error": {"code": "OVERLOADED"}})),
)
.mount(&server)
.await;
let qc = QueryConfig {
retry: RetryPolicy {
max_retries: 10,
base_backoff: Duration::from_millis(1),
max_backoff: Duration::from_secs(1),
deadline: Duration::from_millis(10),
jitter: 0.0,
},
..fast_config()
};
let client = test_client(&server.uri(), qc);
let err = client
.query(req())
.await
.expect_err("should exhaust deadline");
assert!(matches!(err, QueryError::Overloaded { attempts: 1, .. }));
}
#[tokio::test]
async fn auto_follow_materializes_full_rows() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
true,
Some("rslt1"),
Some(3),
)))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("limit", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready"
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("offset", "0"))
.and(query_param("limit", "2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready", "rows": [[1], [2]]
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("offset", "2"))
.and(query_param("limit", "2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready", "rows": [[3]]
})))
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let resp = client
.query(req())
.await
.expect("auto-follow should succeed");
assert_eq!(resp.rows.len(), 3);
assert_eq!(resp.rows[2], vec![json!(3)]);
assert!(resp.truncated);
assert_eq!(resp.total_row_count.flatten(), Some(3));
}
#[tokio::test]
async fn auto_follow_falls_back_to_query_run_total() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
true,
Some("rslt1"),
None,
)))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("limit", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready"
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/query-runs/qrun1"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"created_at": "2026-06-15T00:00:00Z",
"id": "qrun1",
"snapshot_id": "snap1",
"sql_hash": "h",
"sql_text": "SELECT 1 AS x",
"status": "succeeded",
"row_count": 1
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("offset", "0"))
.and(query_param("limit", "2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready", "rows": [[1]]
})))
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let resp = client
.query(req())
.await
.expect("auto-follow should succeed");
assert_eq!(resp.rows.len(), 1);
assert_eq!(resp.total_row_count.flatten(), Some(1));
}
#[tokio::test]
async fn auto_follow_rows_guard_pre_check() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
true,
Some("rslt1"),
Some(1000),
)))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("limit", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready"
})))
.mount(&server)
.await;
let qc = QueryConfig {
max_auto_rows: Some(10),
..fast_config()
};
let client = test_client(&server.uri(), qc);
let err = client
.query(req())
.await
.expect_err("should trip row guard");
match err {
QueryError::Result(ResultError::TooLarge {
kind: TooLargeKind::Rows,
observed,
limit,
..
}) => {
assert_eq!(observed, 1000);
assert_eq!(limit, 10);
}
other => panic!("expected TooLarge rows, got {other:?}"),
}
}
#[tokio::test]
async fn auto_follow_bytes_guard_during_pagination() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
true,
Some("rslt1"),
None,
)))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("limit", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready"
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/query-runs/qrun1"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"created_at": "2026-06-15T00:00:00Z",
"id": "qrun1",
"snapshot_id": "snap1",
"sql_hash": "h",
"sql_text": "SELECT 1 AS x",
"status": "succeeded"
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("offset", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready",
"rows": [["aaaaaaaaaa"], ["bbbbbbbbbb"]]
})))
.mount(&server)
.await;
let qc = QueryConfig {
max_auto_bytes: Some(8),
..fast_config()
};
let client = test_client(&server.uri(), qc);
let err = client
.query(req())
.await
.expect_err("should trip byte guard");
assert!(matches!(
err,
QueryError::Result(ResultError::TooLarge {
kind: TooLargeKind::Bytes,
..
})
));
}
#[tokio::test]
async fn auto_follow_failed_result_409() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
true,
Some("rslt1"),
Some(3),
)))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("limit", "0"))
.respond_with(ResponseTemplate::new(409).set_body_json(json!({
"result_id": "rslt1", "status": "failed", "error_message": "boom"
})))
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let err = client
.query(req())
.await
.expect_err("failed result should error");
match err {
QueryError::Result(ResultError::Failed { error_message, .. }) => {
assert_eq!(error_message.as_deref(), Some("boom"));
}
other => panic!("expected Failed, got {other:?}"),
}
}
#[tokio::test]
async fn auto_follow_missing_result_id_is_unavailable() {
let server = MockServer::start().await;
let mut body = preview_json(true, None, None);
body["result_id"] = Value::Null; body["warning"] = json!("catalog registration failed");
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(body))
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let err = client
.query(req())
.await
.expect_err("missing result_id should error");
match err {
QueryError::Result(ResultError::Unavailable { warning }) => {
assert_eq!(warning.as_deref(), Some("catalog registration failed"));
}
other => panic!("expected Unavailable, got {other:?}"),
}
}
#[tokio::test]
async fn auto_follow_off_returns_preview() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
true,
Some("rslt1"),
Some(3),
)))
.mount(&server)
.await;
let qc = QueryConfig {
auto_follow: false,
..fast_config()
};
let client = test_client(&server.uri(), qc);
let resp = client
.query(req())
.await
.expect("preview should pass through");
assert!(resp.truncated);
assert_eq!(resp.rows.len(), 1); }
#[tokio::test]
async fn auto_follow_incomplete_pagination() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(preview_json(
true,
Some("rslt1"),
Some(5),
)))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("limit", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready"
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("offset", "0"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready", "rows": [[1], [2]]
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/v1/results/rslt1"))
.and(query_param("offset", "2"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"result_id": "rslt1", "status": "ready", "rows": []
})))
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let err = client
.query(req())
.await
.expect_err("stalled pagination should error");
match err {
QueryError::Result(ResultError::Incomplete {
fetched, expected, ..
}) => {
assert_eq!(fetched, 2);
assert_eq!(expected, 5);
}
other => panic!("expected Incomplete, got {other:?}"),
}
}
#[test]
fn retry_after_parses_integer_seconds() {
let retry = RetryPolicy {
jitter: 0.0,
..RetryPolicy::default()
};
let d = backoff_delay(&retry, 1, Some(Duration::from_secs(7)));
assert_eq!(d, Duration::from_secs(7)); }
#[test]
fn backoff_is_exponential_without_retry_after() {
let retry = RetryPolicy {
base_backoff: Duration::from_secs(1),
max_backoff: Duration::from_secs(100),
jitter: 0.0,
..RetryPolicy::default()
};
assert_eq!(backoff_delay(&retry, 1, None), Duration::from_secs(1));
assert_eq!(backoff_delay(&retry, 2, None), Duration::from_secs(2));
assert_eq!(backoff_delay(&retry, 3, None), Duration::from_secs(4));
}
#[test]
fn byte_estimate_is_positive_and_grows() {
let small = estimate_rows_bytes(&[vec![json!(1)]]);
let big = estimate_rows_bytes(&[vec![json!("aaaaaaaaaa")]]);
assert!(small > 0);
assert!(big > small);
}
#[test]
fn checked_offset_refuses_overflow() {
assert_eq!(checked_offset(0, "r").unwrap(), 0);
assert_eq!(checked_offset(i32::MAX as i64, "r").unwrap(), i32::MAX);
match checked_offset(i32::MAX as i64 + 1, "rslt1") {
Err(ResultError::TooLarge {
kind: TooLargeKind::Rows,
observed,
result_id,
..
}) => {
assert_eq!(observed, i32::MAX as u64 + 1);
assert_eq!(result_id, "rslt1");
}
other => panic!("expected TooLarge rows, got {other:?}"),
}
}
#[test]
fn page_size_is_clamped_to_make_progress() {
assert_eq!(effective_page_size(0), 1);
assert_eq!(effective_page_size(-5), 1);
assert_eq!(effective_page_size(50_000), 50_000);
}
#[test]
fn query_config_with_setters_override_fields() {
let cfg = QueryConfig::default()
.with_auto_follow(false)
.with_max_auto_rows(None)
.with_max_auto_bytes(Some(123));
assert!(!cfg.auto_follow);
assert_eq!(cfg.max_auto_rows, None);
assert_eq!(cfg.max_auto_bytes, Some(123));
assert_eq!(cfg.retry.max_retries, RetryPolicy::default().max_retries);
}
#[tokio::test]
async fn async_request_is_rejected_before_any_request() {
let client = test_client("http://127.0.0.1:1", fast_config());
let mut request = req();
request.r#async = Some(true);
let err = client
.query(request)
.await
.expect_err("async=true must be rejected by query()");
assert!(matches!(err, QueryError::AsyncRequested));
}
#[tokio::test]
async fn query_preview_does_not_follow_truncation() {
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(
ResponseTemplate::new(200).set_body_json(preview_json(true, Some("rslt1"), Some(3))),
)
.mount(&server)
.await;
let client = test_client(&server.uri(), fast_config());
let resp = client
.query_preview(req())
.await
.expect("preview should pass through");
assert!(resp.truncated);
assert_eq!(resp.rows.len(), 1);
}
}