use crate::rate_limit::{RateLimitConfig, RateLimitStats, RateLimiter};
use crate::{retry::RetryConfig, token::TokenProvider};
use std::path::Path;
use std::sync::Arc;
pub struct GcpHttpClient {
http: reqwest::Client,
token_provider: Arc<dyn TokenProvider>,
retry_config: RetryConfig,
quota_project_override: Option<String>,
rate_limiter: RateLimiter,
#[cfg(any(test, feature = "test-support"))]
pub(crate) base_url: Option<String>,
#[cfg(feature = "test-support")]
mock: Option<Arc<crate::mock_client::MockClient>>,
}
impl GcpHttpClient {
pub fn builder() -> GcpHttpClientBuilder {
GcpHttpClientBuilder::default()
}
pub fn quota_project_id(&self) -> Option<&str> {
if let Some(ref project) = self.quota_project_override {
return Some(project.as_str());
}
self.token_provider.quota_project_id()
}
pub fn rate_limit_stats(&self) -> Vec<RateLimitStats> {
self.rate_limiter.stats()
}
pub async fn from_adc() -> Result<Self, crate::auth::AdcError> {
let credential = crate::auth::AdcCredential::new().await?;
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.map_err(|e| crate::auth::AdcError::HttpClientCreation { source: e })?;
Ok(Self {
http,
token_provider: Arc::new(credential),
retry_config: RetryConfig::default(),
quota_project_override: None,
rate_limiter: RateLimiter::new(RateLimitConfig::default().0),
#[cfg(any(test, feature = "test-support"))]
base_url: None,
#[cfg(feature = "test-support")]
mock: None,
})
}
pub fn from_service_account_file(
path: &Path,
) -> Result<Self, crate::auth::ServiceAccountError> {
let credential = crate::auth::ServiceAccountCredential::from_file(path)?;
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.map_err(|e| crate::auth::ServiceAccountError::HttpClientCreation { source: e })?;
Ok(Self {
http,
token_provider: Arc::new(credential),
retry_config: RetryConfig::default(),
quota_project_override: None,
rate_limiter: RateLimiter::new(RateLimitConfig::default().0),
#[cfg(any(test, feature = "test-support"))]
base_url: None,
#[cfg(feature = "test-support")]
mock: None,
})
}
#[cfg(feature = "test-support")]
pub fn from_mock(mock: crate::mock_client::MockClient) -> Self {
Self {
http: reqwest::Client::new(),
token_provider: Arc::new(crate::token::StaticTokenProvider::new("mock-token")),
retry_config: RetryConfig::default(),
quota_project_override: None,
rate_limiter: RateLimiter::new(RateLimitConfig::default().0),
base_url: None, mock: Some(Arc::new(mock)),
}
}
pub async fn get(&self, url: &str) -> crate::Result<bytes::Bytes> {
self.request_with_retry(reqwest::Method::GET, url, None)
.await
}
pub async fn post(
&self,
url: &str,
body: &impl serde::Serialize,
) -> crate::Result<bytes::Bytes> {
let bytes = serde_json::to_vec(body).map_err(|e| crate::GcpError::InvalidResponse {
message: format!("Failed to serialize request body: {e}"),
body: None,
})?;
self.request_with_retry(reqwest::Method::POST, url, Some(bytes))
.await
}
pub async fn delete(&self, url: &str) -> crate::Result<bytes::Bytes> {
self.request_with_retry(reqwest::Method::DELETE, url, None)
.await
}
pub async fn put(
&self,
url: &str,
body: &impl serde::Serialize,
) -> crate::Result<bytes::Bytes> {
let bytes = serde_json::to_vec(body).map_err(|e| crate::GcpError::InvalidResponse {
message: format!("Failed to serialize request body: {e}"),
body: None,
})?;
self.request_with_retry(reqwest::Method::PUT, url, Some(bytes))
.await
}
pub async fn patch(
&self,
url: &str,
body: &impl serde::Serialize,
) -> crate::Result<bytes::Bytes> {
let bytes = serde_json::to_vec(body).map_err(|e| crate::GcpError::InvalidResponse {
message: format!("Failed to serialize request body: {e}"),
body: None,
})?;
self.request_with_retry(reqwest::Method::PATCH, url, Some(bytes))
.await
}
pub fn access_approval(&self) -> crate::api::AccessApprovalClient<'_> {
crate::api::AccessApprovalClient::new(self)
}
pub fn apikeys(&self) -> crate::api::ApikeysClient<'_> {
crate::api::ApikeysClient::new(self)
}
pub fn appengine(&self) -> crate::api::AppEngineClient<'_> {
crate::api::AppEngineClient::new(self)
}
pub fn bigquery(&self) -> crate::api::BigqueryClient<'_> {
crate::api::BigqueryClient::new(self)
}
pub fn cloud_asset(&self) -> crate::api::CloudAssetClient<'_> {
crate::api::CloudAssetClient::new(self)
}
pub fn billing(&self) -> crate::api::BillingClient<'_> {
crate::api::BillingClient::new(self)
}
pub fn kms(&self) -> crate::api::KmsClient<'_> {
crate::api::KmsClient::new(self)
}
pub fn projects(&self) -> crate::api::ProjectsClient<'_> {
crate::api::ProjectsClient::new(self)
}
pub fn scheduler(&self) -> crate::api::SchedulerClient<'_> {
crate::api::SchedulerClient::new(self)
}
pub fn compute(&self) -> crate::api::ComputeClient<'_> {
crate::api::ComputeClient::new(self)
}
pub fn container(&self) -> crate::api::ContainerClient<'_> {
crate::api::ContainerClient::new(self)
}
pub fn dlp(&self) -> crate::api::DlpClient<'_> {
crate::api::DlpClient::new(self)
}
pub fn dns(&self) -> crate::api::DnsClient<'_> {
crate::api::DnsClient::new(self)
}
pub fn essential_contacts(&self) -> crate::api::EssentialContactsClient<'_> {
crate::api::EssentialContactsClient::new(self)
}
pub fn gkebackup(&self) -> crate::api::GkeBackupClient<'_> {
crate::api::GkeBackupClient::new(self)
}
pub fn iam(&self) -> crate::api::IamClient<'_> {
crate::api::IamClient::new(self)
}
pub fn logging(&self) -> crate::api::LoggingClient<'_> {
crate::api::LoggingClient::new(self)
}
pub fn monitoring(&self) -> crate::api::MonitoringClient<'_> {
crate::api::MonitoringClient::new(self)
}
pub fn osconfig(&self) -> crate::api::OsConfigClient<'_> {
crate::api::OsConfigClient::new(self)
}
pub fn recommender(&self) -> crate::api::RecommenderClient<'_> {
crate::api::RecommenderClient::new(self)
}
pub fn secret_manager(&self) -> crate::api::SecretManagerClient<'_> {
crate::api::SecretManagerClient::new(self)
}
pub fn service_usage(&self) -> crate::api::ServiceUsageClient<'_> {
crate::api::ServiceUsageClient::new(self)
}
pub fn sqladmin(&self) -> crate::api::SqladminClient<'_> {
crate::api::SqladminClient::new(self)
}
pub fn storage(&self) -> crate::api::StorageClient<'_> {
crate::api::StorageClient::new(self)
}
fn resolve_quota_project(&self) -> Option<String> {
if let Some(ref project) = self.quota_project_override {
return Some(project.clone());
}
if let Ok(project) = std::env::var("GOOGLE_CLOUD_QUOTA_PROJECT")
&& !project.is_empty()
{
return Some(project);
}
self.token_provider
.quota_project_id()
.map(|s| s.to_string())
}
async fn request_with_retry(
&self,
method: reqwest::Method,
url: &str,
body: Option<Vec<u8>>,
) -> crate::Result<bytes::Bytes> {
#[cfg(feature = "test-support")]
if let Some(ref mock) = self.mock {
let value = body
.as_deref()
.map(serde_json::from_slice::<serde_json::Value>)
.transpose()
.map_err(|e| crate::GcpError::InvalidResponse {
message: format!("Failed to deserialize request body for mock: {e}"),
body: None,
})?;
let result = mock.execute(method.as_str(), url, value.as_ref()).await?;
return Ok(bytes::Bytes::from(result));
}
let _permit = self.rate_limiter.acquire(url).await;
let quota_project = self.resolve_quota_project();
let mut attempt = 0;
let mut backoff = self.retry_config.initial_backoff;
loop {
let token = self
.token_provider
.get_token(&["https://www.googleapis.com/auth/cloud-platform"])
.await
.map_err(|e| crate::GcpError::InvalidResponse {
message: format!("Failed to get token: {}", e),
body: None,
})?;
let mut request = self.http.request(method.clone(), url);
request = request.header("Authorization", format!("Bearer {}", token));
if let Some(ref project) = quota_project {
request = request.header("x-goog-user-project", project.as_str());
}
if let Some(ref body) = body {
request = request
.header("Content-Type", "application/json")
.body(body.clone());
}
let response = request.send().await.map_err(crate::GcpError::from)?;
match self.classify_response(response, url, method.as_str()).await {
Ok(data) => return Ok(data),
Err(crate::GcpError::Auth { .. })
if attempt == 0 && self.retry_config.retry_on_401 =>
{
self.token_provider.on_token_rejected();
attempt += 1;
continue;
}
Err(e) if e.is_retryable() && attempt < self.retry_config.max_retries => {
let delay = self.retry_config.compute_backoff(backoff, e.retry_after());
tokio::time::sleep(delay).await;
backoff = std::time::Duration::from_secs_f64(
backoff.as_secs_f64() * self.retry_config.backoff_multiplier,
);
attempt += 1;
continue;
}
Err(e) => return Err(e),
}
}
}
async fn classify_response(
&self,
response: reqwest::Response,
resource: &str,
method: &str,
) -> crate::Result<bytes::Bytes> {
let status = response.status();
let _status_code = status.as_u16();
match status {
reqwest::StatusCode::OK
| reqwest::StatusCode::CREATED
| reqwest::StatusCode::NO_CONTENT => {
let body = response.bytes().await.map_err(crate::GcpError::from)?;
Ok(body)
}
reqwest::StatusCode::UNAUTHORIZED => {
let body = response.text().await.unwrap_or_default();
Err(crate::GcpError::Auth {
message: Self::extract_error_message(&body),
})
}
reqwest::StatusCode::FORBIDDEN => {
let body = response.text().await.unwrap_or_default();
let message = Self::extract_error_message(&body);
if message.contains("API") && message.contains("not enabled") {
let api = Self::extract_api_name(&message);
Err(crate::GcpError::ApiNotEnabled { api, message })
} else {
Err(crate::GcpError::PermissionDenied {
message,
resource: resource.to_string(),
method: method.to_string(),
})
}
}
reqwest::StatusCode::NOT_FOUND => Err(crate::GcpError::NotFound {
resource: resource.to_string(),
method: method.to_string(),
}),
reqwest::StatusCode::TOO_MANY_REQUESTS => {
let retry_after = response
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok());
let body = response.text().await.unwrap_or_default();
Err(crate::GcpError::RateLimited {
retry_after,
message: Self::extract_error_message(&body),
resource: resource.to_string(),
})
}
reqwest::StatusCode::BAD_REQUEST => {
let body = response.text().await.unwrap_or_default();
let message = Self::extract_error_message(&body);
let field = Self::extract_field_name(&body);
Err(crate::GcpError::InvalidArgument { message, field })
}
s if s.is_server_error() => {
let body = response.text().await.unwrap_or_default();
let message = Self::extract_error_message(&body);
let retryable = matches!(
s,
reqwest::StatusCode::SERVICE_UNAVAILABLE
| reqwest::StatusCode::INTERNAL_SERVER_ERROR
);
Err(crate::GcpError::ServerError {
status: s.as_u16(),
message,
resource: resource.to_string(),
retryable,
})
}
_ => {
let body = response.text().await.ok();
Err(crate::GcpError::InvalidResponse {
message: format!("Unexpected status: {}", status),
body,
})
}
}
}
fn extract_error_message(body: &str) -> String {
serde_json::from_str::<serde_json::Value>(body)
.ok()
.and_then(|json| {
json.get("error")
.and_then(|e| e.get("message"))
.and_then(|m| m.as_str())
.map(|s| s.to_string())
})
.unwrap_or_else(|| body.to_string())
}
fn extract_api_name(message: &str) -> String {
message
.split_whitespace()
.find(|s| s.contains("googleapis.com"))
.unwrap_or("unknown")
.to_string()
}
fn extract_field_name(body: &str) -> Option<String> {
serde_json::from_str::<serde_json::Value>(body)
.ok()
.and_then(|json| {
json.get("error")
.and_then(|e| e.get("field"))
.and_then(|f| f.as_str())
.map(|s| s.to_string())
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum BuilderError {
#[error("TokenProvider is required")]
MissingTokenProvider,
#[error("Failed to create HTTP client: {source}")]
HttpClientCreation {
#[source]
source: reqwest::Error,
},
}
#[derive(Default)]
pub struct GcpHttpClientBuilder {
token_provider: Option<Arc<dyn TokenProvider>>,
http_client: Option<reqwest::Client>,
retry_config: Option<RetryConfig>,
rate_limit_config: Option<RateLimitConfig>,
quota_project: Option<String>,
#[cfg(any(test, feature = "test-support"))]
base_url: Option<String>,
}
impl GcpHttpClientBuilder {
pub fn token_provider<T: TokenProvider + 'static>(mut self, provider: T) -> Self {
self.token_provider = Some(Arc::new(provider));
self
}
pub fn retry_config(mut self, config: RetryConfig) -> Self {
self.retry_config = Some(config);
self
}
pub fn http_client(mut self, client: reqwest::Client) -> Self {
self.http_client = Some(client);
self
}
pub fn rate_limit(mut self, config: RateLimitConfig) -> Self {
self.rate_limit_config = Some(config);
self
}
pub fn quota_project(mut self, project: impl Into<String>) -> Self {
self.quota_project = Some(project.into());
self
}
#[cfg(any(test, feature = "test-support"))]
pub fn base_url(mut self, url: impl Into<String>) -> Self {
self.base_url = Some(url.into());
self
}
pub fn build(self) -> Result<GcpHttpClient, BuilderError> {
let token_provider = self
.token_provider
.ok_or(BuilderError::MissingTokenProvider)?;
let retry_config = self.retry_config.unwrap_or_default();
let rate_limiter = RateLimiter::new(self.rate_limit_config.unwrap_or_default().0);
let http = match self.http_client {
Some(client) => client,
None => reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.map_err(|e| BuilderError::HttpClientCreation { source: e })?,
};
Ok(GcpHttpClient {
http,
token_provider,
retry_config,
quota_project_override: self.quota_project,
rate_limiter,
#[cfg(any(test, feature = "test-support"))]
base_url: self.base_url,
#[cfg(feature = "test-support")]
mock: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::token::StaticTokenProvider;
use serial_test::serial;
use std::fs;
use tempfile::TempDir;
#[test]
fn builder_creates_client() {
let provider = StaticTokenProvider::new("token");
let client = GcpHttpClient::builder()
.token_provider(provider)
.build()
.unwrap();
assert!(Arc::strong_count(&client.token_provider) >= 1);
}
const TEST_PRIVATE_KEY: &str = "-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAv/ZgLU6ZAC6tVLSjrzeVeigCbmFVXFqeohH0wsveMsgUnktr
KoQUhclteEs7iGLwKPQdWyOMIaQFC320l1wqvJKg7XbWtYyC856yBtHBisXzjUIP
vppA7Ie6N0uKtqZ1HLXKbqEd5bNPEU61LJJgLXOdXRbb+9EhLusQpWQb3cPLI/za
qmSa6UYwEv8GCtIgNqTjKycHCi0MqKpjNZ6wFzvPruLqnkOhtA2LVsklp+9jYxca
yv9UBS5xVQ01WHHSR/J//G2v0yCUzmitdDJTQyOd4zlPkHpm6T69m7NaE1fTVSiP
sVfO3Hn7VfzAgbQkZH40Q+OSlTubBQcZ8JyJWQIDAQABAoIBAAJOMBqt3GO2lnbT
YjmJvPDAt8IXHFUVsoeG7diuuvOtraUMCf9dTY4gx0DgAxjwz+pnVM3s0p3vJW9d
T7SsqEe8/r6eBCyd1s8cYLjOaUO50Q0T1h1nfAWgiKw+1Zg6zg0YTX7VeQbdR9hm
SKyTx8tr8sp01T4EOuDgdQH40+aD0ivfbFdIyim82IGh7HHvJyMVxTsMHG1fRo/d
kYpT3g2jOpEYCe0EnmAp2bB1kLLonlW+Xp3OOYShLXUtwXf8q/fOMcYbm3BV2OvC
zhTaKmvStEpMhLHihsNJIf0uQZypY6lNu9IDhj2dacKjRNEpZn+ulwfdcx46VEAy
gnt9IKECgYEA/Y4fSVQ4TsGJvH4vjb0+3o4uckWEPTffUwJG6r4PYSZGA8qQpr1E
oyxXdZt0atcsXs1Qd1E2t2FruumS18CTQvJE0+q06kqa6djYH7svOzfjxC5F4Np4
EkMYEVecJ5qG1bU77BqJwY4rTRQ96e5PmgjTPV9hfxvNX51JTm06gfECgYEAwdA3
pabK+6x2hjxyNLldPdcctFZqA16OeL5VWk5/7L2kLFUYVjyz129vAtkJKnB+GAJs
uZcNwhBbqSiK0vv5WoQOIzrSzbzLZ7STjymMzZ6FOkdElbD6H811idKpI87fQD0O
Eo4L5wslqIVxR8ktoRJgRvI6sFm1ajSsarlrlekCgYAC7cJUwYFI/5lMsRRxia8R
OQk2TrFBV8Tfm5YgHgPldmC2qH9VPbhuPhPgiuQkW8nqamq0hh6graJl7U7B6TqK
OmwrGnnufuAdNWEBtNLN105tNK+f8kYSx+2ePanTF0jZbRd9Ga1fq/m6ETLJ4fPP
bqyp99ETe8m6ggGXw1E6sQKBgG950rf90pylWtrk44992qqaEtGLLpjXhzzdxPwX
UK8beNVi8IeRjKNqXcCWkxYM9AndQyoQPwKTJBWM0yR9d7PfZr5OtDdP0vLIQ2NB
s9IEzn5xxXoP/B3UsDlgqJaHA5PQSkrT1vbCS5u9fSWcChmuFyBXbPhH8PewakdM
dRwZAoGAexRCCrNBek6bxaCVI8JqRRIGYPAwr9sUjKwn8Tdhutx8lvcKOk6AHG1I
uQAVf8HQ3eHRgsCSodf1XeoLWX+0Nxt/KJ1KotVlchFlCLuSzpNkR7WbLC7QfCkJ
RLK9OKOIcBVVctVsUtrWLjTEHyKVhYwIW98X+LAal+i55n75SHU=
-----END RSA PRIVATE KEY-----";
fn create_test_service_account_json() -> String {
format!(
r#"{{
"type": "service_account",
"project_id": "test-project",
"private_key_id": "key123",
"private_key": {:?},
"client_email": "test@test-project.iam.gserviceaccount.com",
"client_id": "123456789",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token"
}}"#,
TEST_PRIVATE_KEY
)
}
#[test]
fn from_service_account_file_creates_client() {
let temp_dir = TempDir::new().unwrap();
let key_path = temp_dir.path().join("service_account.json");
fs::write(&key_path, create_test_service_account_json()).unwrap();
let client = GcpHttpClient::from_service_account_file(&key_path).unwrap();
assert!(Arc::strong_count(&client.token_provider) >= 1);
}
#[test]
fn from_service_account_file_not_found() {
let path = Path::new("/nonexistent/file.json");
let result = GcpHttpClient::from_service_account_file(path);
assert!(result.is_err());
assert!(matches!(
result,
Err(crate::auth::ServiceAccountError::FileReadError { .. })
));
}
#[test]
fn from_service_account_file_invalid_json() {
let temp_dir = TempDir::new().unwrap();
let key_path = temp_dir.path().join("invalid.json");
fs::write(&key_path, "not valid json").unwrap();
let result = GcpHttpClient::from_service_account_file(&key_path);
assert!(result.is_err());
assert!(matches!(
result,
Err(crate::auth::ServiceAccountError::InvalidJson { .. })
));
}
#[tokio::test]
#[serial]
async fn from_adc_with_env_var() {
let temp_dir = TempDir::new().unwrap();
let cred_path = temp_dir.path().join("adc_creds.json");
fs::write(&cred_path, create_test_service_account_json()).unwrap();
let prev_value = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok();
unsafe {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &cred_path);
}
let result = GcpHttpClient::from_adc().await;
unsafe {
if let Some(prev) = prev_value {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", prev);
} else {
std::env::remove_var("GOOGLE_APPLICATION_CREDENTIALS");
}
}
assert!(result.is_ok());
let client = result.unwrap();
assert!(Arc::strong_count(&client.token_provider) >= 1);
}
#[tokio::test]
#[serial]
async fn from_adc_with_invalid_env_var() {
let prev_value = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok();
let prev_gcloud = std::env::var("GOOGLE_AUTH_USE_GCLOUD").ok();
unsafe {
std::env::set_var(
"GOOGLE_APPLICATION_CREDENTIALS",
"/nonexistent/path/to/creds.json",
);
std::env::remove_var("GOOGLE_AUTH_USE_GCLOUD");
}
let result = GcpHttpClient::from_adc().await;
unsafe {
if let Some(prev) = prev_value {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", prev);
} else {
std::env::remove_var("GOOGLE_APPLICATION_CREDENTIALS");
}
if let Some(prev) = prev_gcloud {
std::env::set_var("GOOGLE_AUTH_USE_GCLOUD", prev);
}
}
assert!(result.is_err());
assert!(matches!(
result,
Err(crate::auth::AdcError::FileReadError { .. })
));
}
#[test]
fn builder_default_has_rate_limiter() {
let provider = StaticTokenProvider::new("token");
let client = GcpHttpClient::builder()
.token_provider(provider)
.build()
.unwrap();
let stats = client.rate_limit_stats();
let default = stats.iter().find(|s| s.api == "default").unwrap();
assert_eq!(default.limit, 20);
}
#[test]
fn builder_with_custom_rate_limit() {
let provider = StaticTokenProvider::new("token");
let config = crate::RateLimitConfig::default().with_default_limit(50);
let client = GcpHttpClient::builder()
.token_provider(provider)
.rate_limit(config)
.build()
.unwrap();
let stats = client.rate_limit_stats();
let default = stats.iter().find(|s| s.api == "default").unwrap();
assert_eq!(default.limit, 50);
}
#[test]
fn builder_with_disabled_rate_limit() {
let provider = StaticTokenProvider::new("token");
let client = GcpHttpClient::builder()
.token_provider(provider)
.rate_limit(crate::RateLimitConfig::disabled())
.build()
.unwrap();
let stats = client.rate_limit_stats();
let default = stats.iter().find(|s| s.api == "default").unwrap();
assert_eq!(default.limit, tokio::sync::Semaphore::MAX_PERMITS);
}
#[test]
fn builder_with_quota_project() {
let provider = StaticTokenProvider::new("token");
let client = GcpHttpClient::builder()
.token_provider(provider)
.quota_project("explicit-quota-project")
.build()
.unwrap();
assert_eq!(client.quota_project_id(), Some("explicit-quota-project"));
}
#[test]
fn builder_without_quota_project_uses_provider() {
let provider = StaticTokenProvider::new("token");
let client = GcpHttpClient::builder()
.token_provider(provider)
.build()
.unwrap();
assert!(client.quota_project_id().is_none());
}
}