use anyhow::{bail, Context, Result};
use reqwest::Client;
use std::time::Duration;
pub use super::iceberg_types::*;
const CATALOG_BASE: &str = "https://catalog.cloudflarestorage.com";
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
pub struct IcebergClient {
client: Client,
token: String,
account_id: String,
bucket: String,
prefix: Option<String>,
}
impl IcebergClient {
pub fn new(token: String, account_id: String, bucket: String) -> Result<Self> {
if token.is_empty() {
bail!("R2 API token is required. Create one at: dash.cloudflare.com > R2 > Manage R2 API Tokens");
}
if token.len() < 20 {
bail!("R2 API token appears too short. Verify you copied the complete token.");
}
if token.len() > 200 {
bail!("R2 API token appears too long. Verify you copied only the token value.");
}
let client = Client::builder()
.user_agent("otlp2pipeline-cli")
.timeout(REQUEST_TIMEOUT)
.build()
.context("Failed to create HTTP client")?;
Ok(Self {
client,
token,
account_id,
bucket,
prefix: None,
})
}
fn catalog_base_url(&self) -> String {
format!("{}/{}/{}", CATALOG_BASE, self.account_id, self.bucket)
}
pub async fn fetch_config(&mut self) -> Result<()> {
let warehouse = format!("{}_{}", self.account_id, self.bucket);
let url = format!(
"{}/v1/config?warehouse={}",
self.catalog_base_url(),
warehouse
);
let response = self
.client
.get(&url)
.bearer_auth(&self.token)
.send()
.await
.context("Failed to fetch catalog config")?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
bail!("Failed to fetch catalog config: HTTP {} - {}", status, body);
}
let config: CatalogConfig = response
.json()
.await
.context("Failed to parse catalog config")?;
self.prefix = config.overrides.and_then(|o| o.prefix).or({
None
});
if self.prefix.is_none() {
bail!("Catalog config does not contain a warehouse prefix");
}
Ok(())
}
fn table_url(&self, table: &str) -> Result<String> {
let prefix = self
.prefix
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Catalog prefix not set. Call fetch_config() first."))?;
Ok(format!(
"{}/v1/{}/namespaces/default/tables/{}",
self.catalog_base_url(),
prefix,
table
))
}
pub async fn get_table_metadata(&self, table: &str) -> Result<Option<TableMetadata>> {
let url = self.table_url(table)?;
let response = self
.client
.get(&url)
.bearer_auth(&self.token)
.send()
.await
.with_context(|| format!("Failed to fetch metadata for table '{}'", table))?;
if response.status() == reqwest::StatusCode::NOT_FOUND {
return Ok(None);
}
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
bail!(
"Failed to fetch table '{}': HTTP {} - {}",
table,
status,
body
);
}
let metadata = response
.json::<TableMetadata>()
.await
.with_context(|| format!("Failed to parse metadata for table '{}'", table))?;
Ok(Some(metadata))
}
pub async fn add_partition_spec(
&self,
table: &str,
mut retries: u32,
) -> Result<AddPartitionResult> {
loop {
let Some(metadata) = self.get_table_metadata(table).await? else {
return Ok(AddPartitionResult::TableNotFound);
};
let inner = &metadata.metadata;
if inner.is_partitioned_by_service_name() {
return Ok(AddPartitionResult::AlreadyPartitioned);
}
let table_uuid = inner
.table_uuid
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Table '{}' has no UUID", table))?;
let default_spec_id = inner.default_spec_id.unwrap_or(0);
let last_partition_id = inner.last_partition_id.unwrap_or(999);
let service_name_field_id = inner
.get_service_name_field_id()
.ok_or_else(|| anyhow::anyhow!("Table '{}' has no service_name field", table))?;
let day_partition_fields: Vec<NewPartitionField> = inner
.partition_specs
.iter()
.find(|spec| spec.fields.iter().any(|f| f.transform == "day"))
.map(|spec| {
spec.fields
.iter()
.filter(|f| f.transform == "day") .map(|f| NewPartitionField {
name: f.name.clone(),
transform: f.transform.clone(),
source_id: f.source_id,
field_id: f.field_id,
})
.collect()
})
.unwrap_or_default();
let new_spec_id = default_spec_id + 1;
let new_partition_field_id = last_partition_id + 1;
let mut new_fields = day_partition_fields;
new_fields.push(NewPartitionField {
name: "service_name".to_string(),
transform: "identity".to_string(),
source_id: service_name_field_id,
field_id: new_partition_field_id,
});
let commit_request = CommitRequest {
requirements: vec![
CommitRequirement::AssertTableUuid {
uuid: table_uuid.clone(),
},
CommitRequirement::AssertDefaultSpecId { default_spec_id },
],
updates: vec![
CommitUpdate::AddSpec {
spec: NewPartitionSpec {
spec_id: new_spec_id,
fields: new_fields,
},
},
CommitUpdate::SetDefaultSpec {
spec_id: new_spec_id,
},
],
};
match self.try_commit_table(table, &commit_request).await {
Ok(()) => return Ok(AddPartitionResult::Added),
Err(CommitError::Conflict) if retries > 0 => {
retries -= 1;
eprintln!(
" Conflict detected for '{}', retrying ({} retries left)...",
table, retries
);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
Err(CommitError::Conflict) => {
bail!("Failed to commit partition spec to table '{}': concurrency conflict after retries", table);
}
Err(CommitError::Other(e)) => return Err(e),
}
}
}
async fn try_commit_table(
&self,
table: &str,
request: &CommitRequest,
) -> Result<(), CommitError> {
let url = self.table_url(table).map_err(CommitError::Other)?;
let response = self
.client
.post(&url)
.bearer_auth(&self.token)
.json(request)
.send()
.await
.with_context(|| format!("Failed to commit partition spec to table '{}'", table))
.map_err(CommitError::Other)?;
let status = response.status();
if status.is_success() {
return Ok(());
}
if status == reqwest::StatusCode::CONFLICT {
return Err(CommitError::Conflict);
}
let body = response.text().await.unwrap_or_default();
Err(CommitError::Other(anyhow::anyhow!(
"Failed to commit partition spec to table '{}': HTTP {} - {}",
table,
status,
body
)))
}
}
enum CommitError {
Conflict,
Other(anyhow::Error),
}