use anyhow::{Context, Result};
use icepick::catalog::Catalog;
use icepick::spec::{
NamespaceIdent, NestedField, PrimitiveType, Schema, TableCreation, TableIdent, Type,
};
use icepick::writer::ParquetWriter;
use icepick::R2Catalog;
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;
async fn create_r2_catalog_from_env() -> Result<R2Catalog> {
dotenvy::dotenv().ok();
let account_id = std::env::var("CLOUDFLARE_ACCOUNT_ID")
.context("CLOUDFLARE_ACCOUNT_ID not found in environment")?;
let bucket_name = std::env::var("CLOUDFLARE_BUCKET_NAME")
.context("CLOUDFLARE_BUCKET_NAME not found in environment")?;
let api_token = std::env::var("CLOUDFLARE_API_TOKEN")
.context("CLOUDFLARE_API_TOKEN not found in environment")?;
let catalog = R2Catalog::new("r2", account_id, bucket_name, api_token)
.await
.map_err(|e| anyhow::anyhow!("Failed to create R2 catalog: {}", e))?;
Ok(catalog)
}
fn build_schema() -> Result<Schema> {
let schema = Schema::builder()
.with_fields(vec![NestedField::required_field(
1,
"id".to_string(),
Type::Primitive(PrimitiveType::Long),
)])
.build()
.context("Failed to build schema")?;
Ok(schema)
}
use arrow::array::Int64Array;
use arrow::record_batch::RecordBatch;
use icepick::arrow_convert::schema_to_arrow;
use std::sync::Arc;
fn create_sample_data(iceberg_schema: &Schema) -> Result<RecordBatch> {
let id_array = Int64Array::from(vec![1, 2, 3]);
let arrow_schema = schema_to_arrow(iceberg_schema)
.context("Failed to convert Iceberg schema to Arrow schema")?;
let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(id_array)])
.context("Failed to create record batch")?;
Ok(batch)
}
fn init_tracing() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_target(false)
.init();
}
#[tokio::main]
async fn main() -> Result<()> {
dotenvy::dotenv().ok();
init_tracing();
let args: Vec<String> = std::env::args().collect();
let (namespace_name, table_name) = if args.len() == 3 {
(args[1].clone(), args[2].clone())
} else {
let namespace =
std::env::var("CLOUDFLARE_NAMESPACE").unwrap_or_else(|_| "default".to_string());
let table = std::env::var("CLOUDFLARE_TABLE").unwrap_or_else(|_| "test_table".to_string());
info!("Usage: {} <namespace> <table-name>", args[0]);
info!("Using defaults: namespace={}, table={}", namespace, table);
(namespace, table)
};
let catalog = create_r2_catalog_from_env()
.await
.context("Failed to connect to R2 Data Catalog")?;
info!("✓ Connected to R2 Data Catalog");
let namespace = NamespaceIdent::new(vec![namespace_name.clone()]);
info!("Creating namespace: {}", namespace_name);
match catalog
.create_namespace(&namespace, Default::default())
.await
{
Ok(_) => info!("✓ Created namespace: {}", namespace_name),
Err(e) if e.to_string().contains("lready exists") || e.to_string().contains("Conflict") => {
info!("ℹ Namespace already exists: {}", namespace_name)
}
Err(e) => {
warn!("Failed to create namespace: {}", e);
info!("ℹ Attempting to continue with existing namespace");
}
}
let schema = build_schema()?;
let table_ident = TableIdent::new(namespace.clone(), table_name.clone());
info!(
"Attempting to load table: {}.{}",
namespace_name, table_name
);
let table = match catalog.load_table(&table_ident).await {
Ok(table) => {
info!("✓ Loaded existing table: {}.{}", namespace_name, table_name);
table
}
Err(load_err) => {
info!("Table not found, attempting to create: {}", load_err);
let table_creation = TableCreation::builder()
.with_name(table_name.clone())
.with_schema(schema.clone())
.build()
.context("Failed to build table creation")?;
let table = catalog
.create_table(&namespace, table_creation)
.await
.context(format!(
"Failed to create table '{}'. Load error: {}. Create error",
table_name, load_err
))?;
info!("✓ Created table: {}.{}", namespace_name, table_name);
table
}
};
let batch = create_sample_data(&schema)?;
let mut writer = ParquetWriter::new(
table
.metadata()
.current_schema()
.context("Failed to load current schema")?
.clone(),
)
.context("Failed to create Parquet writer")?;
writer
.write_batch(&batch)
.context("Failed to write batch")?;
let file_path = format!(
"{}/data/file-{}.parquet",
table.location(),
uuid::Uuid::new_v4()
);
let data_file = writer
.finish(table.file_io(), file_path.clone())
.await
.context("Failed to finish writing Parquet file")?;
info!("✓ Wrote {} rows to {}", batch.num_rows(), file_path);
table
.transaction()
.append(vec![data_file])
.commit(&catalog)
.await
.context("Failed to commit transaction")?;
info!("✓ Committed snapshot to table");
info!("--- Reading data back ---");
let table = catalog.load_table(&table_ident).await?;
let files = table.files().await?;
info!("✓ Found {} data file(s)", files.len());
for file in &files {
info!(
"Data file {} ({} records, {} bytes)",
file.file_path, file.record_count, file.file_size_in_bytes
);
}
let scan = table.scan().build()?;
let mut stream = scan.to_arrow().await?;
use futures::StreamExt;
let mut total_rows = 0;
info!("Reading batches:");
while let Some(batch_result) = stream.next().await {
let batch = batch_result?;
total_rows += batch.num_rows();
info!("Batch: {} rows", batch.num_rows());
if total_rows == batch.num_rows() {
use arrow::util::pretty::print_batches;
info!("Sample data:");
print_batches(&[batch])?;
}
}
info!("✓ Read {} total rows", total_rows);
Ok(())
}