use super::commit_types::{CommitTableRequest, CommitTableResponse};
use super::types;
use super::IcebergRestCatalog;
use crate::catalog::{
AuthProvider, CatalogError, CatalogOptions, HttpClientConfig, R2Config, Result,
};
use crate::io::FileIO;
use crate::spec::TableIdent;
use reqwest::Client;
#[cfg(not(target_family = "wasm"))]
use super::arn::{parse_s3tables_arn, ARN_ENCODE_SET};
#[cfg(not(target_family = "wasm"))]
use aws_credential_types::provider::ProvideCredentials;
#[cfg(not(target_family = "wasm"))]
use percent_encoding::utf8_percent_encode;
impl IcebergRestCatalog {
pub async fn from_r2(
name: String,
account_id: impl Into<String>,
bucket_name: impl Into<String>,
api_token: impl Into<String>,
) -> Result<Self> {
Self::from_r2_with_options(
name,
account_id,
bucket_name,
api_token,
CatalogOptions::default(),
)
.await
}
pub async fn from_r2_with_options(
name: String,
account_id: impl Into<String>,
bucket_name: impl Into<String>,
api_token: impl Into<String>,
options: CatalogOptions,
) -> Result<Self> {
let config = R2Config {
account_id: account_id.into(),
bucket_name: bucket_name.into(),
api_token: api_token.into(),
endpoint_override: None,
};
Self::from_r2_config_with_options(name, config, options).await
}
pub async fn from_r2_config_with_options(
name: String,
config: R2Config,
options: CatalogOptions,
) -> Result<Self> {
let endpoint = config.endpoint_override.unwrap_or_else(|| {
format!(
"https://catalog.cloudflarestorage.com/{}/{}",
config.account_id, config.bucket_name
)
});
let auth = Box::new(crate::catalog::BearerTokenAuthProvider::new(
config.api_token,
));
let http_client = build_http_client(options.http())?;
let warehouse = format!("{}_{}", config.account_id, config.bucket_name);
let config_url = format!("{}/v1/config?warehouse={}", endpoint, warehouse);
let req = http_client.get(&config_url).build().map_err(|e| {
CatalogError::HttpError(format!("Failed to build config request: {}", e))
})?;
let signed_req = auth.sign_request(req).await?;
let response = http_client
.execute(signed_req)
.await
.map_err(|e| CatalogError::HttpError(format!("Config request failed: {}", e)))?;
let status = response.status();
let body_text = response
.text()
.await
.unwrap_or_else(|_| "Unable to read response".to_string());
if !status.is_success() {
return Err(CatalogError::HttpError(format!(
"Config request failed with status {}: {}",
status, body_text
)));
}
let config_response: types::ConfigResponse =
serde_json::from_str(&body_text).map_err(|e| {
CatalogError::HttpError(format!("Failed to parse config response: {}", e))
})?;
let mut properties = config_response.defaults;
properties.insert("warehouse".to_string(), warehouse.clone());
properties.extend(config_response.overrides);
let prefix = properties.get("prefix").cloned().unwrap_or_default();
let r2_endpoint = format!("https://{}.r2.cloudflarestorage.com", config.account_id);
let mut s3_config_vec = vec![
("endpoint".to_string(), r2_endpoint),
("bucket".to_string(), config.bucket_name.clone()),
];
for (key, value) in &properties {
if key.starts_with("s3.") {
let opendal_key = key.strip_prefix("s3.").unwrap_or(key).to_string();
s3_config_vec.push((opendal_key, value.clone()));
}
}
let operator =
opendal::Operator::via_iter(opendal::Scheme::S3, s3_config_vec).map_err(|e| {
CatalogError::Unexpected(format!("Failed to create S3 operator: {}", e))
})?;
let file_io = FileIO::new(operator);
Ok(Self {
endpoint,
prefix,
http_client,
auth_provider: auth,
file_io,
name,
options,
})
}
#[cfg(not(target_family = "wasm"))]
pub async fn from_s3_tables_arn(name: String, arn: &str) -> Result<Self> {
Self::from_s3_tables_arn_with_options(name, arn, CatalogOptions::default()).await
}
#[cfg(not(target_family = "wasm"))]
pub async fn from_s3_tables_arn_with_options(
name: String,
arn: &str,
options: CatalogOptions,
) -> Result<Self> {
let (region, _bucket_name) = parse_s3tables_arn(arn)?;
let endpoint = format!("https://s3tables.{}.amazonaws.com/iceberg", region);
let warehouse_prefix = utf8_percent_encode(arn, ARN_ENCODE_SET).to_string();
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let credentials = config
.credentials_provider()
.ok_or_else(|| CatalogError::AuthError("No credentials provider found".to_string()))?
.provide_credentials()
.await
.map_err(|e| CatalogError::AuthError(format!("Failed to load credentials: {}", e)))?;
let auth = Box::new(crate::catalog::SigV4AuthProvider::new(
region.clone(),
"s3tables".to_string(),
credentials.clone(),
));
let http_client = build_http_client(options.http())?;
let file_io_credentials = crate::io::AwsCredentials {
access_key_id: credentials.access_key_id().to_string(),
secret_access_key: credentials.secret_access_key().to_string(),
session_token: credentials.session_token().map(|s| s.to_string()),
};
let file_io = FileIO::from_aws_credentials(file_io_credentials, region.clone());
Ok(Self {
endpoint,
prefix: warehouse_prefix, http_client,
auth_provider: auth,
file_io,
name,
options,
})
}
pub async fn commit_table(
&self,
identifier: &TableIdent,
request: CommitTableRequest,
) -> Result<CommitTableResponse> {
let namespace = identifier.namespace().as_ref().join("/");
let table_name = identifier.name();
let url = self.table_url(&namespace, table_name, true);
let req = self
.http_client
.post(&url)
.json(&request)
.build()
.map_err(|e| CatalogError::HttpError(format!("Failed to build request: {}", e)))?;
let response = self.send_request(req).await?;
if response.status().as_u16() == 409 {
return Err(CatalogError::Conflict(
"Concurrent modification detected".to_string(),
));
}
let json_value = self.handle_response(response).await?;
let commit_response: CommitTableResponse = serde_json::from_value(json_value)
.map_err(|e| CatalogError::HttpError(format!("Failed to parse response: {}", e)))?;
Ok(commit_response)
}
}
#[cfg(not(target_family = "wasm"))]
fn build_http_client(config: &HttpClientConfig) -> Result<Client> {
let mut builder = Client::builder();
if let Some(timeout) = config.timeout() {
builder = builder.timeout(timeout);
}
if let Some(connect_timeout) = config.connect_timeout() {
builder = builder.connect_timeout(connect_timeout);
}
if let Some(user_agent) = config.user_agent() {
builder = builder.user_agent(user_agent.to_string());
}
builder
.build()
.map_err(|e| CatalogError::HttpError(format!("Failed to build HTTP client: {}", e)))
}
#[cfg(target_family = "wasm")]
fn build_http_client(_config: &HttpClientConfig) -> Result<Client> {
Client::builder()
.build()
.map_err(|e| CatalogError::HttpError(format!("Failed to build HTTP client: {}", e)))
}