use std::env;
use crate::apis::configuration::{ApiKey, Configuration};
use crate::apis::{self, Error};
use crate::auth::{TokenManager, TokenManagerOptions};
use crate::models;
pub const DEFAULT_BASE_URL: &str = "https://api.hotdata.dev";
pub const WORKSPACE_ID_HEADER: &str = "X-Workspace-Id";
pub const SESSION_ID_HEADER: &str = "X-Session-Id";
pub const ENV_API_KEY: &str = "HOTDATA_API_KEY";
pub const ENV_WORKSPACE_ID: &str = "HOTDATA_WORKSPACE_ID";
pub const ENV_API_URL: &str = "HOTDATA_API_URL";
pub const ENV_TEST_API_URL: &str = "HOTDATA_SDK_TEST_API_URL";
#[derive(Debug)]
#[non_exhaustive]
pub enum ClientError {
MissingApiToken,
MissingWorkspaceId,
}
impl std::fmt::Display for ClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClientError::MissingApiToken => write!(
f,
"no API token supplied; set it via ClientBuilder::api_token or the {ENV_API_KEY} environment variable"
),
ClientError::MissingWorkspaceId => write!(
f,
"no workspace id supplied; set it via ClientBuilder::workspace_id or the {ENV_WORKSPACE_ID} environment variable"
),
}
}
}
impl std::error::Error for ClientError {}
#[derive(Debug, Default, Clone)]
pub struct ClientBuilder {
api_token: Option<String>,
workspace_id: Option<String>,
session_id: Option<String>,
base_url: Option<String>,
user_agent: Option<String>,
client_id: Option<String>,
reqwest_client: Option<reqwest::Client>,
}
impl ClientBuilder {
pub fn api_token(mut self, token: impl Into<String>) -> Self {
self.api_token = Some(token.into());
self
}
pub fn workspace_id(mut self, workspace_id: impl Into<String>) -> Self {
self.workspace_id = Some(workspace_id.into());
self
}
pub fn session_id(mut self, session_id: impl Into<String>) -> Self {
self.session_id = Some(session_id.into());
self
}
pub fn base_url(mut self, base_url: impl Into<String>) -> Self {
self.base_url = Some(base_url.into());
self
}
pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.user_agent = Some(user_agent.into());
self
}
pub fn client_id(mut self, client_id: impl Into<String>) -> Self {
self.client_id = Some(client_id.into());
self
}
pub fn reqwest_client(mut self, client: reqwest::Client) -> Self {
self.reqwest_client = Some(client);
self
}
fn resolve_base_url(&self) -> String {
if let Some(ref url) = self.base_url {
return url.clone();
}
if let Some(url) = non_empty_env(ENV_TEST_API_URL) {
return url;
}
if let Some(url) = non_empty_env(ENV_API_URL) {
return url;
}
DEFAULT_BASE_URL.to_owned()
}
pub fn build(self) -> Result<Client, ClientError> {
let api_token = self
.api_token
.clone()
.or_else(|| non_empty_env(ENV_API_KEY))
.ok_or(ClientError::MissingApiToken)?;
let workspace_id = self
.workspace_id
.clone()
.or_else(|| non_empty_env(ENV_WORKSPACE_ID))
.ok_or(ClientError::MissingWorkspaceId)?;
let base_path = self.resolve_base_url();
let http_client = self.reqwest_client.clone().unwrap_or_default();
let user_agent = self
.user_agent
.clone()
.unwrap_or_else(|| format!("hotdata-rust/{}", env!("CARGO_PKG_VERSION")));
let mut configuration = Configuration {
base_path: base_path.clone(),
user_agent: Some(user_agent),
client: http_client.clone(),
..Configuration::default()
};
configuration.api_keys.insert(
WORKSPACE_ID_HEADER.to_owned(),
ApiKey {
prefix: None,
key: workspace_id,
},
);
if let Some(session_id) = self
.session_id
.clone()
.or_else(|| non_empty_env("HOTDATA_SESSION_ID"))
{
configuration.api_keys.insert(
SESSION_ID_HEADER.to_owned(),
ApiKey {
prefix: None,
key: session_id,
},
);
}
let token_manager = TokenManager::with_options(
api_token,
http_client,
TokenManagerOptions {
base_path,
client_id: self
.client_id
.clone()
.unwrap_or_else(|| TokenManagerOptions::default().client_id),
..TokenManagerOptions::default()
},
);
configuration.token_provider = Some(std::sync::Arc::new(token_manager));
Ok(Client { configuration })
}
}
#[derive(Debug, Clone)]
pub struct Client {
configuration: Configuration,
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
pub fn from_configuration(configuration: Configuration) -> Self {
Client { configuration }
}
pub fn configuration(&self) -> &Configuration {
&self.configuration
}
pub fn configuration_mut(&mut self) -> &mut Configuration {
&mut self.configuration
}
pub async fn query(
&self,
request: models::QueryRequest,
) -> Result<models::QueryResponse, Error<apis::query_api::QueryError>> {
apis::query_api::query(&self.configuration, request, None).await
}
pub async fn submit_query(
&self,
request: models::QueryRequest,
database_id: Option<&str>,
) -> Result<QueryOutcome, Error<apis::query_api::QueryError>> {
use crate::apis::ResponseContent;
let configuration = &self.configuration;
let uri_str = format!("{}/v1/query", configuration.base_path);
let mut req_builder = configuration
.client
.request(reqwest::Method::POST, &uri_str);
if let Some(ref user_agent) = configuration.user_agent {
req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone());
}
if let Some(param_value) = database_id {
req_builder = req_builder.header("X-Database-Id", param_value.to_string());
}
if let Some(apikey) = configuration.api_keys.get("X-Workspace-Id") {
let key = apikey.key.clone();
let value = match apikey.prefix {
Some(ref prefix) => format!("{} {}", prefix, key),
None => key,
};
req_builder = req_builder.header("X-Workspace-Id", value);
};
if let Some(apikey) = configuration.api_keys.get("X-Session-Id") {
let key = apikey.key.clone();
let value = match apikey.prefix {
Some(ref prefix) => format!("{} {}", prefix, key),
None => key,
};
req_builder = req_builder.header("X-Session-Id", value);
};
if let Some(token) = configuration.resolve_bearer_token().await {
req_builder = req_builder.bearer_auth(token);
};
req_builder = req_builder.json(&request);
let req = req_builder.build()?;
crate::http_log::log_request(&req);
let resp = configuration.client.execute(req).await?;
let status = resp.status();
crate::http_log::log_response_status(status);
if status == reqwest::StatusCode::ACCEPTED {
let content = resp.text().await?;
crate::http_log::log_response_body(&content);
let submitted: models::AsyncQueryResponse = serde_json::from_str(&content)?;
Ok(QueryOutcome::Submitted(submitted))
} else if !status.is_client_error() && !status.is_server_error() {
let content = resp.text().await?;
crate::http_log::log_response_body(&content);
let inline: models::QueryResponse = serde_json::from_str(&content)?;
Ok(QueryOutcome::Inline(inline))
} else {
let content = resp.text().await?;
crate::http_log::log_response_body(&content);
let entity: Option<apis::query_api::QueryError> = serde_json::from_str(&content).ok();
Err(Error::ResponseError(ResponseContent {
status,
content,
entity,
}))
}
}
pub async fn upload_stream<S, B, E>(
&self,
body: S,
content_type: Option<&str>,
content_length: Option<u64>,
) -> Result<models::UploadResponse, Error<apis::uploads_api::UploadFileError>>
where
S: futures_core::Stream<Item = Result<B, E>> + Send + 'static,
bytes::Bytes: From<B>,
B: 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
{
use crate::apis::ResponseContent;
use serde::de::Error as _;
let configuration = &self.configuration;
let uri_str = format!("{}/v1/files", configuration.base_path);
let mut req_builder = configuration
.client
.request(reqwest::Method::POST, &uri_str);
if let Some(ref user_agent) = configuration.user_agent {
req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone());
}
req_builder = req_builder.header(
reqwest::header::CONTENT_TYPE,
content_type.unwrap_or("application/octet-stream"),
);
if let Some(apikey) = configuration.api_keys.get("X-Workspace-Id") {
let key = apikey.key.clone();
let value = match apikey.prefix {
Some(ref prefix) => format!("{} {}", prefix, key),
None => key,
};
req_builder = req_builder.header("X-Workspace-Id", value);
};
if let Some(apikey) = configuration.api_keys.get("X-Session-Id") {
let key = apikey.key.clone();
let value = match apikey.prefix {
Some(ref prefix) => format!("{} {}", prefix, key),
None => key,
};
req_builder = req_builder.header("X-Session-Id", value);
};
if let Some(token) = configuration.resolve_bearer_token().await {
req_builder = req_builder.bearer_auth(token);
};
if let Some(len) = content_length {
req_builder = req_builder.header(reqwest::header::CONTENT_LENGTH, len);
}
req_builder = req_builder.body(reqwest::Body::wrap_stream(body));
let req = req_builder.build()?;
crate::http_log::log_request(&req);
let resp = configuration.client.execute(req).await?;
let status = resp.status();
crate::http_log::log_response_status(status);
let content_type = resp
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("application/octet-stream")
.to_owned();
let is_json =
content_type.starts_with("application") && content_type.contains("json");
if !status.is_client_error() && !status.is_server_error() {
let content = resp.text().await?;
crate::http_log::log_response_body(&content);
if is_json {
serde_json::from_str(&content).map_err(Error::from)
} else {
Err(Error::from(serde_json::Error::custom(format!(
"Received `{content_type}` content type response that cannot be converted to `models::UploadResponse`"
))))
}
} else {
let content = resp.text().await?;
crate::http_log::log_response_body(&content);
let entity: Option<apis::uploads_api::UploadFileError> =
serde_json::from_str(&content).ok();
Err(Error::ResponseError(ResponseContent {
status,
content,
entity,
}))
}
}
pub async fn list_query_runs(
&self,
limit: Option<i32>,
cursor: Option<&str>,
) -> Result<models::ListQueryRunsResponse, Error<apis::query_runs_api::ListQueryRunsError>>
{
apis::query_runs_api::list_query_runs(&self.configuration, limit, cursor, None, None).await
}
pub async fn list_results(
&self,
limit: Option<i32>,
offset: Option<i32>,
) -> Result<models::ListResultsResponse, Error<apis::results_api::ListResultsError>> {
apis::results_api::list_results(&self.configuration, limit, offset).await
}
pub async fn get_result(
&self,
id: &str,
) -> Result<models::GetResultResponse, Error<apis::results_api::GetResultError>> {
apis::results_api::get_result(&self.configuration, id, None, None, None).await
}
pub async fn list_workspaces(
&self,
organization_public_id: Option<&str>,
) -> Result<models::ListWorkspacesResponse, Error<apis::workspaces_api::ListWorkspacesError>>
{
apis::workspaces_api::list_workspaces(&self.configuration, organization_public_id).await
}
#[cfg(feature = "arrow")]
pub async fn get_result_arrow(
&self,
id: &str,
offset: Option<i64>,
limit: Option<i64>,
) -> Result<crate::arrow::ArrowResult, crate::arrow::ArrowError> {
crate::arrow::get_result_arrow(&self.configuration, id, offset, limit).await
}
#[cfg(feature = "arrow")]
pub async fn stream_result_arrow(
&self,
id: &str,
offset: Option<i64>,
limit: Option<i64>,
) -> Result<crate::arrow::ArrowBatchStream, crate::arrow::ArrowError> {
crate::arrow::stream_result_arrow(&self.configuration, id, offset, limit).await
}
pub fn datasets(&self) -> crate::resources::DatasetsApi<'_> {
crate::resources::DatasetsApi::new(&self.configuration)
}
pub fn connections(&self) -> crate::resources::ConnectionsApi<'_> {
crate::resources::ConnectionsApi::new(&self.configuration)
}
pub fn connection_types(&self) -> crate::resources::ConnectionTypesApi<'_> {
crate::resources::ConnectionTypesApi::new(&self.configuration)
}
pub fn database_context(&self) -> crate::resources::DatabaseContextApi<'_> {
crate::resources::DatabaseContextApi::new(&self.configuration)
}
pub fn databases(&self) -> crate::resources::DatabasesApi<'_> {
crate::resources::DatabasesApi::new(&self.configuration)
}
pub fn embedding_providers(&self) -> crate::resources::EmbeddingProvidersApi<'_> {
crate::resources::EmbeddingProvidersApi::new(&self.configuration)
}
pub fn indexes(&self) -> crate::resources::IndexesApi<'_> {
crate::resources::IndexesApi::new(&self.configuration)
}
pub fn information_schema(&self) -> crate::resources::InformationSchemaApi<'_> {
crate::resources::InformationSchemaApi::new(&self.configuration)
}
pub fn jobs(&self) -> crate::resources::JobsApi<'_> {
crate::resources::JobsApi::new(&self.configuration)
}
pub fn queries(&self) -> crate::resources::QueryApi<'_> {
crate::resources::QueryApi::new(&self.configuration)
}
pub fn query_runs(&self) -> crate::resources::QueryRunsApi<'_> {
crate::resources::QueryRunsApi::new(&self.configuration)
}
pub fn results(&self) -> crate::resources::ResultsApi<'_> {
crate::resources::ResultsApi::new(&self.configuration)
}
pub fn refresh(&self) -> crate::resources::RefreshApi<'_> {
crate::resources::RefreshApi::new(&self.configuration)
}
pub fn sandboxes(&self) -> crate::resources::SandboxesApi<'_> {
crate::resources::SandboxesApi::new(&self.configuration)
}
pub fn saved_queries(&self) -> crate::resources::SavedQueriesApi<'_> {
crate::resources::SavedQueriesApi::new(&self.configuration)
}
pub fn secrets(&self) -> crate::resources::SecretsApi<'_> {
crate::resources::SecretsApi::new(&self.configuration)
}
pub fn uploads(&self) -> crate::resources::UploadsApi<'_> {
crate::resources::UploadsApi::new(&self.configuration)
}
pub fn workspaces(&self) -> crate::resources::WorkspacesApi<'_> {
crate::resources::WorkspacesApi::new(&self.configuration)
}
pub async fn await_result(
&self,
result_id: &str,
poll: PollConfig,
) -> Result<models::GetResultResponse, AwaitResultError> {
let deadline = std::time::Instant::now() + poll.timeout;
loop {
let result = self
.get_result(result_id)
.await
.map_err(AwaitResultError::Api)?;
match crate::status::ResultStatus::parse(&result.status) {
crate::status::ResultStatus::Ready => return Ok(result),
crate::status::ResultStatus::Failed => {
return Err(AwaitResultError::Failed {
result_id: result_id.to_owned(),
error_message: result.error_message.flatten(),
})
}
_ => {}
}
if std::time::Instant::now() >= deadline {
return Err(AwaitResultError::Timeout {
result_id: result_id.to_owned(),
last_status: result.status,
waited: poll.timeout,
});
}
tokio::time::sleep(poll.interval).await;
}
}
#[cfg(feature = "arrow")]
pub async fn query_to_arrow(
&self,
request: models::QueryRequest,
poll: PollConfig,
offset: Option<i64>,
limit: Option<i64>,
) -> Result<crate::arrow::ArrowResult, QueryToArrowError> {
let submitted = self
.query(request)
.await
.map_err(QueryToArrowError::Query)?;
let result_id =
submitted
.result_id
.flatten()
.ok_or_else(|| QueryToArrowError::NoResultId {
warning: submitted.warning.flatten(),
})?;
let deadline = std::time::Instant::now() + poll.timeout;
loop {
match self.get_result_arrow(&result_id, offset, limit).await {
Ok(result) => return Ok(result),
Err(crate::arrow::ArrowError::NotReady {
status,
retry_after,
..
}) => {
if std::time::Instant::now() >= deadline {
return Err(QueryToArrowError::Timeout {
result_id,
last_status: status,
waited: poll.timeout,
});
}
let wait = retry_after
.map(std::time::Duration::from_secs)
.unwrap_or(poll.interval)
.min(deadline.saturating_duration_since(std::time::Instant::now()));
tokio::time::sleep(wait).await;
}
Err(e) => return Err(QueryToArrowError::Arrow(e)),
}
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum QueryOutcome {
Inline(models::QueryResponse),
Submitted(models::AsyncQueryResponse),
}
#[derive(Debug, Clone, Copy)]
pub struct PollConfig {
pub timeout: std::time::Duration,
pub interval: std::time::Duration,
}
impl Default for PollConfig {
fn default() -> Self {
PollConfig {
timeout: std::time::Duration::from_secs(120),
interval: std::time::Duration::from_secs(1),
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum AwaitResultError {
Api(Error<apis::results_api::GetResultError>),
Failed {
result_id: String,
error_message: Option<String>,
},
Timeout {
result_id: String,
last_status: String,
waited: std::time::Duration,
},
}
impl std::fmt::Display for AwaitResultError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AwaitResultError::Api(e) => write!(f, "failed to fetch result: {e}"),
AwaitResultError::Failed {
result_id,
error_message,
} => write!(
f,
"result {result_id} failed: {}",
error_message.as_deref().unwrap_or("no error message")
),
AwaitResultError::Timeout {
result_id,
last_status,
waited,
} => write!(
f,
"result {result_id} did not become ready within {waited:?} (last status: {last_status})"
),
}
}
}
impl std::error::Error for AwaitResultError {}
#[cfg(feature = "arrow")]
#[derive(Debug)]
#[non_exhaustive]
pub enum QueryToArrowError {
Query(Error<apis::query_api::QueryError>),
NoResultId {
warning: Option<String>,
},
Timeout {
result_id: String,
last_status: String,
waited: std::time::Duration,
},
Arrow(crate::arrow::ArrowError),
}
#[cfg(feature = "arrow")]
impl std::fmt::Display for QueryToArrowError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryToArrowError::Query(e) => write!(f, "query failed: {e}"),
QueryToArrowError::NoResultId { warning } => write!(
f,
"query result was not persisted, cannot fetch as Arrow: {}",
warning.as_deref().unwrap_or("no result_id returned")
),
QueryToArrowError::Timeout {
result_id,
last_status,
waited,
} => write!(
f,
"result {result_id} not ready after {waited:?} (last status: {last_status})"
),
QueryToArrowError::Arrow(e) => write!(f, "arrow decode failed: {e}"),
}
}
}
#[cfg(feature = "arrow")]
impl std::error::Error for QueryToArrowError {}
fn non_empty_env(key: &str) -> Option<String> {
env::var(key)
.ok()
.map(|v| v.trim().to_owned())
.filter(|v| !v.is_empty())
}
#[cfg(test)]
mod tests {
use super::*;
fn env_guard() -> std::sync::MutexGuard<'static, ()> {
crate::ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner())
}
fn clear_env() {
for key in [
ENV_API_KEY,
ENV_WORKSPACE_ID,
ENV_API_URL,
ENV_TEST_API_URL,
"HOTDATA_SESSION_ID",
"HOTDATA_DISABLE_JWT_EXCHANGE",
] {
env::remove_var(key);
}
}
#[tokio::test]
async fn builder_client_id_attributes_token_traffic() {
use wiremock::matchers::{body_string_contains, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let _g = env_guard();
clear_env();
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/auth/jwt"))
.and(body_string_contains("client_id=hotdata-cli"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"access_token": "minted-jwt",
"expires_in": 300,
"refresh_token": "r1"
})))
.mount(&server)
.await;
let client = Client::builder()
.api_token("hd_opaque")
.workspace_id("ws_x")
.client_id("hotdata-cli")
.base_url(server.uri())
.build()
.expect("build should succeed");
let bearer = client.configuration().resolve_bearer_token().await;
assert_eq!(bearer.as_deref(), Some("minted-jwt"));
clear_env();
}
fn query_test_client(base_url: &str) -> Client {
let mut configuration = Configuration {
base_path: base_url.to_owned(),
user_agent: Some("hotdata-rust-test".to_owned()),
bearer_access_token: Some("test-bearer".to_owned()),
..Configuration::default()
};
configuration.api_keys.insert(
WORKSPACE_ID_HEADER.to_owned(),
ApiKey {
prefix: None,
key: "ws_test".to_owned(),
},
);
Client::from_configuration(configuration)
}
#[tokio::test]
async fn submit_query_200_inline() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let _g = env_guard();
clear_env();
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"columns": ["n"],
"execution_time_ms": 3,
"nullable": [false],
"query_run_id": "qr_inline",
"row_count": 1,
"rows": [[1]],
})))
.mount(&server)
.await;
let client = query_test_client(&server.uri());
let outcome = client
.submit_query(models::QueryRequest::new("select 1".into()), None)
.await
.expect("submit_query should succeed");
match outcome {
QueryOutcome::Inline(resp) => {
assert_eq!(resp.query_run_id, "qr_inline");
assert_eq!(resp.row_count, 1);
}
other => panic!("expected Inline, got {other:?}"),
}
}
#[tokio::test]
async fn submit_query_202_submitted() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let _g = env_guard();
clear_env();
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.respond_with(ResponseTemplate::new(202).set_body_json(serde_json::json!({
"query_run_id": "qr_async_42",
"status": "pending",
"status_url": "https://api.hotdata.dev/v1/query-runs/qr_async_42",
})))
.mount(&server)
.await;
let client = query_test_client(&server.uri());
let outcome = client
.submit_query(
models::QueryRequest {
r#async: Some(true),
..models::QueryRequest::new("select 1".into())
},
None,
)
.await
.expect("submit_query should succeed");
match outcome {
QueryOutcome::Submitted(resp) => {
assert_eq!(resp.query_run_id, "qr_async_42");
assert_eq!(resp.status, "pending");
assert_eq!(
resp.status_url,
"https://api.hotdata.dev/v1/query-runs/qr_async_42"
);
}
other => panic!("expected Submitted, got {other:?}"),
}
}
#[tokio::test]
async fn submit_query_sends_scope_and_auth_headers() {
use wiremock::matchers::{header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let _g = env_guard();
clear_env();
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/query"))
.and(header("X-Database-Id", "db_123"))
.and(header("X-Workspace-Id", "ws_test"))
.and(header("Authorization", "Bearer test-bearer"))
.respond_with(ResponseTemplate::new(202).set_body_json(serde_json::json!({
"query_run_id": "qr_scoped",
"status": "pending",
"status_url": "https://api.hotdata.dev/v1/query-runs/qr_scoped",
})))
.mount(&server)
.await;
let client = query_test_client(&server.uri());
let outcome = client
.submit_query(
models::QueryRequest::new("select 1".into()),
Some("db_123"),
)
.await
.expect("submit_query should succeed with scoped headers");
assert!(matches!(outcome, QueryOutcome::Submitted(_)));
}
#[tokio::test]
async fn upload_stream_posts_and_parses_response() {
use wiremock::matchers::{body_bytes, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let _g = env_guard();
clear_env();
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/files"))
.and(body_bytes(b"hello world".to_vec()))
.respond_with(ResponseTemplate::new(201).set_body_json(serde_json::json!({
"content_type": "text/csv",
"created_at": "2026-06-04T00:00:00Z",
"id": "upload_abc",
"size_bytes": 11,
"status": "ready",
})))
.mount(&server)
.await;
let client = query_test_client(&server.uri());
let chunks: Vec<Result<bytes::Bytes, std::io::Error>> = vec![
Ok(bytes::Bytes::from_static(b"hello ")),
Ok(bytes::Bytes::from_static(b"world")),
];
let stream = futures::stream::iter(chunks);
let resp = client
.upload_stream(stream, Some("text/csv"), None)
.await
.expect("upload_stream should succeed");
assert_eq!(resp.id, "upload_abc");
assert_eq!(resp.size_bytes, 11);
assert_eq!(resp.status, "ready");
}
#[tokio::test]
async fn upload_stream_sends_scope_auth_and_content_type() {
use wiremock::matchers::{header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let _g = env_guard();
clear_env();
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/files"))
.and(header("X-Workspace-Id", "ws_test"))
.and(header("Authorization", "Bearer test-bearer"))
.and(header("Content-Type", "application/parquet"))
.respond_with(ResponseTemplate::new(201).set_body_json(serde_json::json!({
"created_at": "2026-06-04T00:00:00Z",
"id": "upload_scoped",
"size_bytes": 3,
"status": "ready",
})))
.mount(&server)
.await;
let client = query_test_client(&server.uri());
let stream = futures::stream::once(async {
Ok::<_, std::io::Error>(bytes::Bytes::from_static(b"abc"))
});
let resp = client
.upload_stream(stream, Some("application/parquet"), None)
.await
.expect("upload_stream should succeed with scoped headers");
assert_eq!(resp.id, "upload_scoped");
}
#[tokio::test]
async fn upload_stream_defaults_content_type() {
use wiremock::matchers::{header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let _g = env_guard();
clear_env();
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/files"))
.and(header("Content-Type", "application/octet-stream"))
.respond_with(ResponseTemplate::new(201).set_body_json(serde_json::json!({
"created_at": "2026-06-04T00:00:00Z",
"id": "upload_default",
"size_bytes": 1,
"status": "ready",
})))
.mount(&server)
.await;
let client = query_test_client(&server.uri());
let stream = futures::stream::once(async {
Ok::<_, std::io::Error>(bytes::Bytes::from_static(b"x"))
});
let resp = client
.upload_stream(stream, None, None)
.await
.expect("upload_stream should default content-type");
assert_eq!(resp.id, "upload_default");
}
#[tokio::test]
async fn upload_stream_sends_content_length_when_sized() {
use wiremock::matchers::{body_bytes, header, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let _g = env_guard();
clear_env();
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/files"))
.and(header("Content-Length", "11"))
.and(body_bytes(b"hello world".to_vec()))
.respond_with(ResponseTemplate::new(201).set_body_json(serde_json::json!({
"content_type": "application/octet-stream",
"created_at": "2026-06-04T00:00:00Z",
"id": "upload_sized",
"size_bytes": 11,
"status": "ready",
})))
.mount(&server)
.await;
let client = query_test_client(&server.uri());
let chunks: Vec<Result<bytes::Bytes, std::io::Error>> = vec![
Ok(bytes::Bytes::from_static(b"hello ")),
Ok(bytes::Bytes::from_static(b"world")),
];
let stream = futures::stream::iter(chunks);
let resp = client
.upload_stream(stream, None, Some(11))
.await
.expect("upload_stream should succeed with a sized body");
assert_eq!(resp.id, "upload_sized");
let received = &server.received_requests().await.expect("recorded requests")[0];
assert_eq!(
received.headers.get("content-length").map(|v| v.as_bytes()),
Some(b"11".as_ref()),
);
assert!(
received.headers.get("transfer-encoding").is_none(),
"sized upload must not use chunked transfer-encoding",
);
}
#[test]
fn builder_explicit_values_win() {
let _g = env_guard();
clear_env();
let client = Client::builder()
.api_token("hd_explicit")
.workspace_id("ws_explicit")
.base_url("https://example.test")
.build()
.expect("build should succeed with explicit values");
let config = client.configuration();
assert_eq!(config.base_path, "https://example.test");
assert_eq!(
config
.api_keys
.get(WORKSPACE_ID_HEADER)
.map(|k| k.key.as_str()),
Some("ws_explicit")
);
assert!(
config.token_provider.is_some(),
"a token provider must be installed"
);
}
#[test]
fn builder_falls_back_to_env() {
let _g = env_guard();
clear_env();
env::set_var(ENV_API_KEY, "hd_from_env");
env::set_var(ENV_WORKSPACE_ID, "ws_from_env");
let client = Client::builder().build().expect("env fallback should work");
let config = client.configuration();
assert_eq!(
config
.api_keys
.get(WORKSPACE_ID_HEADER)
.map(|k| k.key.as_str()),
Some("ws_from_env")
);
assert_eq!(config.base_path, DEFAULT_BASE_URL);
clear_env();
}
#[test]
fn explicit_api_token_beats_env() {
let _g = env_guard();
clear_env();
env::set_var(ENV_API_KEY, "hd_from_env");
env::set_var(ENV_WORKSPACE_ID, "ws_from_env");
let client = Client::builder()
.api_token("hd_explicit")
.workspace_id("ws_explicit")
.build()
.expect("build should succeed");
assert_eq!(
client
.configuration()
.api_keys
.get(WORKSPACE_ID_HEADER)
.map(|k| k.key.as_str()),
Some("ws_explicit")
);
clear_env();
}
#[test]
fn missing_token_errors() {
let _g = env_guard();
clear_env();
let err = Client::builder()
.workspace_id("ws_only")
.build()
.expect_err("missing token must error");
assert!(matches!(err, ClientError::MissingApiToken));
}
#[test]
fn missing_workspace_errors() {
let _g = env_guard();
clear_env();
let err = Client::builder()
.api_token("hd_only")
.build()
.expect_err("missing workspace must error");
assert!(matches!(err, ClientError::MissingWorkspaceId));
}
#[test]
fn test_url_override_precedence() {
let _g = env_guard();
clear_env();
env::set_var(ENV_API_KEY, "hd_x");
env::set_var(ENV_WORKSPACE_ID, "ws_x");
env::set_var(ENV_API_URL, "https://generic.test");
env::set_var(ENV_TEST_API_URL, "https://test-override.test");
let client = Client::builder().build().expect("build ok");
assert_eq!(
client.configuration().base_path,
"https://test-override.test"
);
env::remove_var(ENV_TEST_API_URL);
let client = Client::builder().build().expect("build ok");
assert_eq!(client.configuration().base_path, "https://generic.test");
let client = Client::builder()
.base_url("https://explicit.test")
.build()
.expect("build ok");
assert_eq!(client.configuration().base_path, "https://explicit.test");
clear_env();
}
#[test]
fn empty_env_treated_as_absent() {
let _g = env_guard();
clear_env();
env::set_var(ENV_API_KEY, " ");
env::set_var(ENV_WORKSPACE_ID, "ws_present");
let err = Client::builder()
.build()
.expect_err("whitespace-only token must be treated as absent");
assert!(matches!(err, ClientError::MissingApiToken));
clear_env();
}
#[test]
fn session_id_installed_when_set() {
let _g = env_guard();
clear_env();
let client = Client::builder()
.api_token("hd_x")
.workspace_id("ws_x")
.session_id("sess_123")
.build()
.expect("build ok");
assert_eq!(
client
.configuration()
.api_keys
.get(SESSION_ID_HEADER)
.map(|k| k.key.as_str()),
Some("sess_123")
);
}
#[test]
fn default_user_agent_uses_crate_version() {
let _g = env_guard();
clear_env();
let client = Client::builder()
.api_token("hd_x")
.workspace_id("ws_x")
.build()
.expect("build ok");
let ua = client.configuration().user_agent.clone().unwrap();
assert_eq!(ua, format!("hotdata-rust/{}", env!("CARGO_PKG_VERSION")));
}
}