use std::collections::HashMap;
use async_trait::async_trait;
use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD};
use google_cloud_auth::credentials;
use lance_core::{Error, Result};
use lance_io::object_store::uri_to_url;
use lance_namespace::models::Identity;
use log::{debug, info, warn};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use super::{CredentialVendor, VendedCredentials, VendedPermission, redact_credential};
const STS_TOKEN_EXCHANGE_URL: &str = "https://sts.googleapis.com/v1/token";
#[derive(Debug, Clone, Default)]
pub struct GcpCredentialVendorConfig {
pub service_account: Option<String>,
pub permission: VendedPermission,
pub workload_identity_provider: Option<String>,
pub impersonation_service_account: Option<String>,
pub api_key_salt: Option<String>,
pub api_key_hash_permissions: HashMap<String, VendedPermission>,
}
impl GcpCredentialVendorConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_service_account(mut self, service_account: impl Into<String>) -> Self {
self.service_account = Some(service_account.into());
self
}
pub fn with_permission(mut self, permission: VendedPermission) -> Self {
self.permission = permission;
self
}
pub fn with_workload_identity_provider(mut self, provider: impl Into<String>) -> Self {
self.workload_identity_provider = Some(provider.into());
self
}
pub fn with_impersonation_service_account(
mut self,
service_account: impl Into<String>,
) -> Self {
self.impersonation_service_account = Some(service_account.into());
self
}
pub fn with_api_key_salt(mut self, salt: impl Into<String>) -> Self {
self.api_key_salt = Some(salt.into());
self
}
pub fn with_api_key_hash_permission(
mut self,
key_hash: impl Into<String>,
permission: VendedPermission,
) -> Self {
self.api_key_hash_permissions
.insert(key_hash.into(), permission);
self
}
pub fn with_api_key_hash_permissions(
mut self,
permissions: HashMap<String, VendedPermission>,
) -> Self {
self.api_key_hash_permissions = permissions;
self
}
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct AccessBoundaryRule {
available_resource: String,
available_permissions: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
availability_condition: Option<AvailabilityCondition>,
}
#[derive(Debug, Clone, Serialize)]
struct AvailabilityCondition {
expression: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CredentialAccessBoundary {
access_boundary: AccessBoundaryInner,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct AccessBoundaryInner {
access_boundary_rules: Vec<AccessBoundaryRule>,
}
#[derive(Debug, Deserialize)]
struct TokenExchangeResponse {
access_token: String,
#[serde(default)]
expires_in: Option<u64>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct GenerateAccessTokenResponse {
access_token: String,
#[allow(dead_code)]
expire_time: String,
}
pub struct GcpCredentialVendor {
config: GcpCredentialVendorConfig,
http_client: Client,
credential: credentials::Credential,
}
impl std::fmt::Debug for GcpCredentialVendor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GcpCredentialVendor")
.field("config", &self.config)
.field("credential", &"[credential]")
.finish()
}
}
impl GcpCredentialVendor {
pub async fn new(config: GcpCredentialVendorConfig) -> Result<Self> {
let credential = credentials::create_access_token_credential()
.await
.map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to create GCP credentials: {}",
e
))))
})?;
Ok(Self {
config,
http_client: Client::new(),
credential,
})
}
fn parse_gcs_uri(uri: &str) -> Result<(String, String)> {
let url = uri_to_url(uri)?;
if url.scheme() != "gs" {
return Err(Error::invalid_input_source(
format!(
"Unsupported GCS URI scheme '{}', expected 'gs'",
url.scheme()
)
.into(),
));
}
let bucket = url
.host_str()
.ok_or_else(|| {
Error::invalid_input_source(format!("GCS URI '{}' missing bucket", uri).into())
})?
.to_string();
let prefix = url.path().trim_start_matches('/').to_string();
Ok((bucket, prefix))
}
async fn get_source_token(&self) -> Result<String> {
let base_token = self.credential.get_token().await.map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to get GCP token: {}",
e
))))
})?;
if let Some(ref service_account) = self.config.service_account {
return self
.impersonate_service_account(&base_token.token, service_account)
.await;
}
Ok(base_token.token)
}
async fn impersonate_service_account(
&self,
base_token: &str,
service_account: &str,
) -> Result<String> {
let url = format!(
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:generateAccessToken",
service_account
);
let body = serde_json::json!({
"scope": ["https://www.googleapis.com/auth/cloud-platform"]
});
let response = self
.http_client
.post(&url)
.bearer_auth(base_token)
.json(&body)
.send()
.await
.map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to call IAM generateAccessToken: {}",
e
))))
})?;
if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "unknown error".to_string());
return Err(Error::io_source(Box::new(std::io::Error::other(format!(
"IAM generateAccessToken failed for '{}' with status {}: {}",
service_account, status, body
)))));
}
let token_response: GenerateAccessTokenResponse = response.json().await.map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to parse generateAccessToken response: {}",
e
))))
})?;
Ok(token_response.access_token)
}
fn build_access_boundary(
bucket: &str,
prefix: &str,
permission: VendedPermission,
) -> CredentialAccessBoundary {
let bucket_resource = format!("//storage.googleapis.com/projects/_/buckets/{}", bucket);
let mut rules = vec![];
let condition = if prefix.is_empty() {
None
} else {
let prefix_trimmed = prefix.trim_end_matches('/');
let list_prefix_attr =
"api.getAttribute('storage.googleapis.com/objectListPrefix', '')";
let expr = format!(
"resource.name.startsWith('projects/_/buckets/{}/objects/{}/') || \
{list_attr} == '{prefix}' || {list_attr}.startsWith('{prefix}/')",
bucket,
prefix_trimmed,
list_attr = list_prefix_attr,
prefix = prefix_trimmed
);
Some(AvailabilityCondition { expression: expr })
};
rules.push(AccessBoundaryRule {
available_resource: bucket_resource.clone(),
available_permissions: vec![
"inRole:roles/storage.legacyObjectReader".to_string(),
"inRole:roles/storage.objectViewer".to_string(),
],
availability_condition: condition.clone(),
});
if permission.can_write() {
rules.push(AccessBoundaryRule {
available_resource: bucket_resource.clone(),
available_permissions: vec![
"inRole:roles/storage.legacyBucketWriter".to_string(),
"inRole:roles/storage.objectCreator".to_string(),
],
availability_condition: condition.clone(),
});
}
if permission.can_delete() {
rules.push(AccessBoundaryRule {
available_resource: bucket_resource,
available_permissions: vec!["inRole:roles/storage.objectAdmin".to_string()],
availability_condition: condition,
});
}
CredentialAccessBoundary {
access_boundary: AccessBoundaryInner {
access_boundary_rules: rules,
},
}
}
async fn downscope_token(
&self,
source_token: &str,
access_boundary: &CredentialAccessBoundary,
) -> Result<(String, u64)> {
let options_json = serde_json::to_string(access_boundary).map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to serialize access boundary: {}",
e
))))
})?;
let params = [
(
"grant_type",
"urn:ietf:params:oauth:grant-type:token-exchange",
),
(
"subject_token_type",
"urn:ietf:params:oauth:token-type:access_token",
),
(
"requested_token_type",
"urn:ietf:params:oauth:token-type:access_token",
),
("subject_token", source_token),
("options", &options_json),
];
let response = self
.http_client
.post(STS_TOKEN_EXCHANGE_URL)
.form(¶ms)
.send()
.await
.map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to call STS token exchange: {}",
e
))))
})?;
if !response.status().is_success() {
let status = response.status();
let body = response
.text()
.await
.unwrap_or_else(|_| "unknown error".to_string());
return Err(Error::io_source(Box::new(std::io::Error::other(format!(
"STS token exchange failed with status {}: {}",
status, body
)))));
}
let token_response: TokenExchangeResponse = response.json().await.map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to parse STS response: {}",
e
))))
})?;
let expires_in_secs = token_response.expires_in.unwrap_or(3600);
let expires_at_millis = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time went backwards")
.as_millis() as u64
+ expires_in_secs * 1000;
Ok((token_response.access_token, expires_at_millis))
}
pub fn hash_api_key(api_key: &str, salt: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(format!("{}:{}", api_key, salt));
format!("{:x}", hasher.finalize())
}
fn derive_session_name_from_token(token: &str) -> String {
let parts: Vec<&str> = token.split('.').collect();
if parts.len() != 3 {
return "lance-gcp-identity".to_string();
}
let payload = match URL_SAFE_NO_PAD.decode(parts[1]) {
Ok(bytes) => bytes,
Err(_) => {
match base64::engine::general_purpose::STANDARD_NO_PAD.decode(parts[1]) {
Ok(bytes) => bytes,
Err(_) => return "lance-gcp-identity".to_string(),
}
}
};
let json: serde_json::Value = match serde_json::from_slice(&payload) {
Ok(v) => v,
Err(_) => return "lance-gcp-identity".to_string(),
};
let subject = json
.get("sub")
.or_else(|| json.get("email"))
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let sanitized: String = subject
.chars()
.filter(|c| c.is_alphanumeric() || *c == '@' || *c == '-' || *c == '.')
.collect();
format!("lance-{}", sanitized)
}
fn normalize_workload_identity_audience(provider: &str) -> String {
const IAM_PREFIX: &str = "//iam.googleapis.com/";
if provider.starts_with(IAM_PREFIX) {
provider.to_string()
} else {
format!("{}{}", IAM_PREFIX, provider)
}
}
async fn exchange_oidc_for_gcp_token(&self, oidc_token: &str) -> Result<String> {
let workload_identity_provider = self
.config
.workload_identity_provider
.as_ref()
.ok_or_else(|| {
Error::invalid_input_source(
"gcp_workload_identity_provider must be configured for OIDC token exchange"
.into(),
)
})?;
let audience = Self::normalize_workload_identity_audience(workload_identity_provider);
let params = [
(
"grant_type",
"urn:ietf:params:oauth:grant-type:token-exchange",
),
("subject_token_type", "urn:ietf:params:oauth:token-type:jwt"),
(
"requested_token_type",
"urn:ietf:params:oauth:token-type:access_token",
),
("subject_token", oidc_token),
("audience", audience.as_str()),
("scope", "https://www.googleapis.com/auth/cloud-platform"),
];
let response = self
.http_client
.post(STS_TOKEN_EXCHANGE_URL)
.form(¶ms)
.send()
.await
.map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to exchange OIDC token for GCP token: {}",
e
))))
})?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(Error::io_source(Box::new(std::io::Error::other(format!(
"GCP STS token exchange failed with status {}: {}",
status, body
)))));
}
let token_response: TokenExchangeResponse = response.json().await.map_err(|e| {
Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to parse GCP STS token response: {}",
e
))))
})?;
let federated_token = token_response.access_token;
if let Some(ref service_account) = self.config.impersonation_service_account {
return self
.impersonate_service_account(&federated_token, service_account)
.await;
}
Ok(federated_token)
}
async fn vend_with_web_identity(
&self,
bucket: &str,
prefix: &str,
auth_token: &str,
) -> Result<VendedCredentials> {
let session_name = Self::derive_session_name_from_token(auth_token);
debug!(
"GCP vend_with_web_identity: bucket={}, prefix={}, session={}",
bucket, prefix, session_name
);
let gcp_token = self.exchange_oidc_for_gcp_token(auth_token).await?;
let access_boundary = Self::build_access_boundary(bucket, prefix, self.config.permission);
let (downscoped_token, expires_at_millis) =
self.downscope_token(&gcp_token, &access_boundary).await?;
let mut storage_options = HashMap::new();
storage_options.insert("google_storage_token".to_string(), downscoped_token.clone());
storage_options.insert(
"expires_at_millis".to_string(),
expires_at_millis.to_string(),
);
info!(
"GCP credentials vended (web identity): bucket={}, prefix={}, permission={}, expires_at={}, token={}",
bucket,
prefix,
self.config.permission,
expires_at_millis,
redact_credential(&downscoped_token)
);
Ok(VendedCredentials::new(storage_options, expires_at_millis))
}
async fn vend_with_api_key(
&self,
bucket: &str,
prefix: &str,
api_key: &str,
) -> Result<VendedCredentials> {
let salt = self.config.api_key_salt.as_ref().ok_or_else(|| {
Error::invalid_input_source(
"api_key_salt must be configured to use API key authentication".into(),
)
})?;
let key_hash = Self::hash_api_key(api_key, salt);
let permission = self
.config
.api_key_hash_permissions
.get(&key_hash)
.copied()
.ok_or_else(|| {
warn!(
"Invalid API key: hash {} not found in permissions map",
&key_hash[..8]
);
Error::invalid_input_source("Invalid API key".into())
})?;
debug!(
"GCP vend_with_api_key: bucket={}, prefix={}, permission={}",
bucket, prefix, permission
);
let source_token = self.get_source_token().await?;
let access_boundary = Self::build_access_boundary(bucket, prefix, permission);
let (downscoped_token, expires_at_millis) = self
.downscope_token(&source_token, &access_boundary)
.await?;
let mut storage_options = HashMap::new();
storage_options.insert("google_storage_token".to_string(), downscoped_token.clone());
storage_options.insert(
"expires_at_millis".to_string(),
expires_at_millis.to_string(),
);
info!(
"GCP credentials vended (api_key): bucket={}, prefix={}, permission={}, expires_at={}, token={}",
bucket,
prefix,
permission,
expires_at_millis,
redact_credential(&downscoped_token)
);
Ok(VendedCredentials::new(storage_options, expires_at_millis))
}
}
#[async_trait]
impl CredentialVendor for GcpCredentialVendor {
async fn vend_credentials(
&self,
table_location: &str,
identity: Option<&Identity>,
) -> Result<VendedCredentials> {
debug!(
"GCP credential vending: location={}, permission={}, identity={:?}",
table_location,
self.config.permission,
identity.map(|i| format!(
"api_key={}, auth_token={}",
i.api_key.is_some(),
i.auth_token.is_some()
))
);
let (bucket, prefix) = Self::parse_gcs_uri(table_location)?;
match identity {
Some(id) if id.auth_token.is_some() => {
let auth_token = id.auth_token.as_ref().unwrap();
self.vend_with_web_identity(&bucket, &prefix, auth_token)
.await
}
Some(id) if id.api_key.is_some() => {
let api_key = id.api_key.as_ref().unwrap();
self.vend_with_api_key(&bucket, &prefix, api_key).await
}
Some(_) => Err(Error::invalid_input_source(
"Identity provided but neither auth_token nor api_key is set".into(),
)),
None => {
let source_token = self.get_source_token().await?;
let access_boundary =
Self::build_access_boundary(&bucket, &prefix, self.config.permission);
let (downscoped_token, expires_at_millis) = self
.downscope_token(&source_token, &access_boundary)
.await?;
let mut storage_options = HashMap::new();
storage_options
.insert("google_storage_token".to_string(), downscoped_token.clone());
storage_options.insert(
"expires_at_millis".to_string(),
expires_at_millis.to_string(),
);
info!(
"GCP credentials vended (static): bucket={}, prefix={}, permission={}, expires_at={}, token={}",
bucket,
prefix,
self.config.permission,
expires_at_millis,
redact_credential(&downscoped_token)
);
Ok(VendedCredentials::new(storage_options, expires_at_millis))
}
}
}
fn provider_name(&self) -> &'static str {
"gcp"
}
fn permission(&self) -> VendedPermission {
self.config.permission
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_gcs_uri() {
let (bucket, prefix) = GcpCredentialVendor::parse_gcs_uri("gs://my-bucket/path/to/table")
.expect("should parse");
assert_eq!(bucket, "my-bucket");
assert_eq!(prefix, "path/to/table");
let (bucket, prefix) =
GcpCredentialVendor::parse_gcs_uri("gs://my-bucket/").expect("should parse");
assert_eq!(bucket, "my-bucket");
assert_eq!(prefix, "");
let (bucket, prefix) =
GcpCredentialVendor::parse_gcs_uri("gs://my-bucket").expect("should parse");
assert_eq!(bucket, "my-bucket");
assert_eq!(prefix, "");
}
#[test]
fn test_parse_gcs_uri_invalid() {
let result = GcpCredentialVendor::parse_gcs_uri("s3://bucket/path");
assert!(result.is_err());
let result = GcpCredentialVendor::parse_gcs_uri("gs:///path");
assert!(result.is_err());
let result = GcpCredentialVendor::parse_gcs_uri("not-a-uri");
assert!(result.is_err());
let result = GcpCredentialVendor::parse_gcs_uri("");
assert!(result.is_err());
}
#[test]
fn test_config_builder() {
let config = GcpCredentialVendorConfig::new()
.with_service_account("my-sa@project.iam.gserviceaccount.com")
.with_permission(VendedPermission::Write);
assert_eq!(
config.service_account,
Some("my-sa@project.iam.gserviceaccount.com".to_string())
);
assert_eq!(config.permission, VendedPermission::Write);
}
#[test]
fn test_build_access_boundary_read() {
let boundary = GcpCredentialVendor::build_access_boundary(
"my-bucket",
"path/to/data",
VendedPermission::Read,
);
let rules = &boundary.access_boundary.access_boundary_rules;
assert_eq!(rules.len(), 1, "Read should have 1 rule");
let permissions = &rules[0].available_permissions;
assert!(permissions.contains(&"inRole:roles/storage.legacyObjectReader".to_string()));
assert!(permissions.contains(&"inRole:roles/storage.objectViewer".to_string()));
assert!(rules[0].availability_condition.is_some());
}
#[test]
fn test_build_access_boundary_write() {
let boundary = GcpCredentialVendor::build_access_boundary(
"my-bucket",
"path/to/data",
VendedPermission::Write,
);
let rules = &boundary.access_boundary.access_boundary_rules;
assert_eq!(rules.len(), 2, "Write should have 2 rules");
let permissions: Vec<_> = rules
.iter()
.flat_map(|r| r.available_permissions.iter())
.collect();
assert!(permissions.contains(&&"inRole:roles/storage.legacyObjectReader".to_string()));
assert!(permissions.contains(&&"inRole:roles/storage.objectViewer".to_string()));
assert!(permissions.contains(&&"inRole:roles/storage.legacyBucketWriter".to_string()));
assert!(permissions.contains(&&"inRole:roles/storage.objectCreator".to_string()));
}
#[test]
fn test_build_access_boundary_admin() {
let boundary = GcpCredentialVendor::build_access_boundary(
"my-bucket",
"path/to/data",
VendedPermission::Admin,
);
let rules = &boundary.access_boundary.access_boundary_rules;
assert_eq!(rules.len(), 3, "Admin should have 3 rules");
let permissions: Vec<_> = rules
.iter()
.flat_map(|r| r.available_permissions.iter())
.collect();
assert!(permissions.contains(&&"inRole:roles/storage.legacyObjectReader".to_string()));
assert!(permissions.contains(&&"inRole:roles/storage.objectViewer".to_string()));
assert!(permissions.contains(&&"inRole:roles/storage.legacyBucketWriter".to_string()));
assert!(permissions.contains(&&"inRole:roles/storage.objectCreator".to_string()));
assert!(permissions.contains(&&"inRole:roles/storage.objectAdmin".to_string()));
}
#[test]
fn test_build_access_boundary_no_prefix() {
let boundary =
GcpCredentialVendor::build_access_boundary("my-bucket", "", VendedPermission::Read);
let rules = &boundary.access_boundary.access_boundary_rules;
assert_eq!(rules.len(), 1);
assert!(rules[0].availability_condition.is_none());
}
#[test]
fn test_normalize_workload_identity_audience() {
let short =
"projects/123456/locations/global/workloadIdentityPools/my-pool/providers/my-provider";
let normalized = GcpCredentialVendor::normalize_workload_identity_audience(short);
assert_eq!(
normalized,
"//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/my-pool/providers/my-provider"
);
let full = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/my-pool/providers/my-provider";
let normalized = GcpCredentialVendor::normalize_workload_identity_audience(full);
assert_eq!(normalized, full);
let normalized_again =
GcpCredentialVendor::normalize_workload_identity_audience(&normalized);
assert_eq!(normalized_again, full);
}
}