use anyhow::{ensure, Context, Result};
use icepick::catalog::Catalog;
use icepick::spec::{
NamespaceIdent, NestedField, PrimitiveType, Schema, TableCreation, TableIdent, Type,
};
use icepick::writer::ParquetWriter;
use icepick::S3TablesCatalog;
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;
async fn create_s3_tables_catalog(arn: &str) -> Result<S3TablesCatalog> {
let catalog = S3TablesCatalog::from_arn("s3tables", arn)
.await
.map_err(|e| anyhow::anyhow!("Failed to create S3 Tables catalog: {}", e))?;
Ok(catalog)
}
fn build_schema() -> Result<Schema> {
let schema = Schema::builder()
.with_fields(vec![NestedField::required_field(
1,
"id".to_string(),
Type::Primitive(PrimitiveType::Long),
)])
.build()
.context("Failed to build schema")?;
Ok(schema)
}
use arrow::array::Int64Array;
use arrow::record_batch::RecordBatch;
use icepick::arrow_convert::schema_to_arrow;
use std::sync::Arc;
fn create_sample_data(iceberg_schema: &Schema) -> Result<RecordBatch> {
let id_array = Int64Array::from(vec![1, 2, 3]);
let arrow_schema = schema_to_arrow(iceberg_schema)
.context("Failed to convert Iceberg schema to Arrow schema")?;
let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![Arc::new(id_array)])
.context("Failed to create record batch")?;
Ok(batch)
}
fn init_tracing() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_target(false)
.init();
}
#[tokio::main]
async fn main() -> Result<()> {
let args: Vec<String> = std::env::args().collect();
init_tracing();
ensure!(
args.len() == 4,
"Usage: {} <s3-tables-arn> <namespace> <table-name>",
args[0]
);
let arn = &args[1];
let namespace_name = &args[2];
let table_name = &args[3];
let catalog = create_s3_tables_catalog(arn)
.await
.context("Failed to connect to S3 Tables catalog")?;
info!("✓ Connected to S3 Tables catalog");
let namespace = NamespaceIdent::new(vec![namespace_name.clone()]);
info!("Creating namespace: {}", namespace_name);
match catalog
.create_namespace(&namespace, Default::default())
.await
{
Ok(_) => info!("✓ Created namespace: {}", namespace_name),
Err(e) if e.to_string().contains("lready exists") || e.to_string().contains("Conflict") => {
info!("ℹ Namespace already exists: {}", namespace_name)
}
Err(e) => {
warn!("Failed to create namespace: {}", e);
info!("ℹ Attempting to continue with existing namespace");
}
}
let schema = build_schema()?;
let table_ident = TableIdent::new(namespace.clone(), table_name.clone());
let table = match catalog.load_table(&table_ident).await {
Ok(table) => {
info!("✓ Loaded existing table: {}.{}", namespace_name, table_name);
table
}
Err(_) => {
let table_creation = TableCreation::builder()
.with_name(table_name.clone())
.with_schema(schema.clone())
.build()
.context("Failed to build table creation")?;
let table = catalog
.create_table(&namespace, table_creation)
.await
.context(format!("Failed to create table '{}'", table_name))?;
info!("✓ Created table: {}.{}", namespace_name, table_name);
table
}
};
let batch = create_sample_data(&schema)?;
let mut writer = ParquetWriter::new(
table
.metadata()
.current_schema()
.context("Failed to load current schema")?
.clone(),
)
.context("Failed to create Parquet writer")?;
writer
.write_batch(&batch)
.context("Failed to write batch")?;
let file_path = format!(
"{}/data/file-{}.parquet",
table.location(),
uuid::Uuid::new_v4()
);
let data_file = writer
.finish(table.file_io(), file_path.clone())
.await
.context("Failed to finish writing Parquet file")?;
info!("✓ Wrote {} rows to {}", batch.num_rows(), file_path);
table
.transaction()
.append(vec![data_file])
.commit(&catalog)
.await
.context("Failed to commit transaction")?;
info!("✓ Committed snapshot to table");
info!("--- Reading data back ---");
let table = catalog.load_table(&table_ident).await?;
let files = table.files().await?;
info!("✓ Found {} data file(s)", files.len());
for file in &files {
info!(
"Data file {} ({} records, {} bytes)",
file.file_path, file.record_count, file.file_size_in_bytes
);
}
let scan = table.scan().build()?;
let mut stream = scan.to_arrow().await?;
use futures::StreamExt;
let mut total_rows = 0;
info!("Reading batches:");
while let Some(batch_result) = stream.next().await {
let batch = batch_result?;
total_rows += batch.num_rows();
info!("Batch: {} rows", batch.num_rows());
if total_rows == batch.num_rows() {
use arrow::util::pretty::print_batches;
info!("Sample data:");
print_batches(&[batch])?;
}
}
info!("✓ Read {} total rows", total_rows);
Ok(())
}