use std::collections::HashMap;
use futures::TryStreamExt;
use paimon::catalog::Identifier;
use paimon::common::{CatalogOptions, Options};
use paimon::spec::{DataType, IntType, Schema, VarCharType};
use paimon::CatalogFactory;
fn create_test_schema() -> Schema {
Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("name", DataType::VarChar(VarCharType::new(255).unwrap()))
.build()
.expect("Failed to build schema")
}
fn array_value_to_string(array: &dyn arrow_array::Array, row: usize) -> String {
use arrow_array::*;
if array.is_null(row) {
return "null".to_string();
}
if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
return arr.value(row).to_string();
}
if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
return arr.value(row).to_string();
}
if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
return arr.value(row).to_string();
}
if let Some(arr) = array.as_any().downcast_ref::<LargeStringArray>() {
return arr.value(row).to_string();
}
format!("<unsupported type: {:?}>", array.data_type())
}
#[tokio::main]
async fn main() {
let mut options = Options::new();
options.set(CatalogOptions::METASTORE, "rest");
options.set(CatalogOptions::WAREHOUSE, "paimon_catalog");
options.set(CatalogOptions::URI, "http://sample.net/");
options.set(CatalogOptions::TOKEN_PROVIDER, "dlf");
options.set("dlf.region", "cn-hangzhou");
options.set(
"dlf.access-key-id",
std::env::var("DLF_ACCESS_KEY_ID").expect("DLF_ACCESS_KEY_ID env var not set"),
);
options.set(
"dlf.access-key-secret",
std::env::var("DLF_ACCESS_KEY_SECRET").expect("DLF_ACCESS_KEY_SECRET env var not set"),
);
println!("Creating RESTCatalog instance...");
let catalog = match CatalogFactory::create(options).await {
Ok(catalog) => catalog,
Err(err) => {
eprintln!("Failed to create RESTCatalog: {err}");
return;
}
};
println!("\n=== Part 1: Database Operations ===\n");
println!("Listing databases...");
match catalog.list_databases().await {
Ok(databases) => {
println!("Databases found: {databases:?}");
println!("Total count: {}", databases.len());
}
Err(err) => {
eprintln!("Failed to list databases: {err}");
}
}
println!("\nCreating database 'example_db'...");
match catalog
.create_database("example_db", false, HashMap::new())
.await
{
Ok(()) => println!("Database created successfully"),
Err(err) => eprintln!("Failed to create database: {err}"),
}
println!("\nGetting database info for 'example_db'...");
match catalog.get_database("example_db").await {
Ok(database) => println!("Database: {database:?}"),
Err(err) => eprintln!("Failed to get database: {err}"),
}
println!("\n=== Part 2: Table Operations ===\n");
let table_identifier = Identifier::new("example_db", "users");
println!("Creating table '{table_identifier}'...");
let schema = create_test_schema();
match catalog.create_table(&table_identifier, schema, false).await {
Ok(()) => println!("Table created successfully"),
Err(err) => eprintln!("Failed to create table: {err}"),
}
println!("\nListing tables in 'example_db'...");
match catalog.list_tables("example_db").await {
Ok(tables) => {
println!("Tables found: {tables:?}");
}
Err(err) => {
eprintln!("Failed to list tables: {err}");
}
}
println!("\nGetting table info for '{table_identifier}'...");
match catalog.get_table(&table_identifier).await {
Ok(table) => {
println!("Table location: {}", table.location());
println!("Table schema fields: {:?}", table.schema().fields());
}
Err(err) => eprintln!("Failed to get table: {err}"),
}
let renamed_identifier = Identifier::new("example_db", "users_renamed");
println!("\nRenaming table '{table_identifier}' to '{renamed_identifier}'...");
match catalog
.rename_table(&table_identifier, &renamed_identifier, false)
.await
{
Ok(()) => println!("Table renamed successfully"),
Err(err) => eprintln!("Failed to rename table: {err}"),
}
println!("\n=== Part 3: Read Data from Existing Table ===\n");
let read_table_identifier = Identifier::new("example_db", "users_renamed");
println!("Attempting to read from table '{read_table_identifier}'...");
match catalog.get_table(&read_table_identifier).await {
Ok(table) => {
println!("Table retrieved successfully");
println!(" Location: {}", table.location());
println!(" Schema fields: {:?}", table.schema().fields());
println!("\nScanning table...");
let read_builder = table.new_read_builder();
let scan = read_builder.new_scan();
match scan.plan().await {
Ok(plan) => {
println!(" Number of splits: {}", plan.splits().len());
if plan.splits().is_empty() {
println!("No data splits found — the table may be empty.");
} else {
println!("\nReading table data...");
match read_builder.new_read() {
Ok(read) => match read.to_arrow(plan.splits()) {
Ok(stream) => {
let batches: Vec<_> =
stream.try_collect().await.unwrap_or_default();
println!("Collected {} record batch(es)", batches.len());
let mut total_rows = 0;
for (batch_index, batch) in batches.iter().enumerate() {
let num_rows = batch.num_rows();
total_rows += num_rows;
println!(
"\n--- Batch {} ({} rows, {} columns) ---",
batch_index,
num_rows,
batch.num_columns()
);
println!("Schema: {}", batch.schema());
let display_rows = num_rows.min(10);
for row in 0..display_rows {
let mut row_values = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
row_values.push(array_value_to_string(column, row));
}
println!(" Row {}: [{}]", row, row_values.join(", "));
}
if num_rows > display_rows {
println!(
" ... ({} more rows omitted)",
num_rows - display_rows
);
}
}
println!("\n=== Read Summary ===");
println!("Total rows read: {total_rows}");
println!("Total batches: {}", batches.len());
}
Err(err) => {
eprintln!("Failed to create arrow stream: {err}");
}
},
Err(err) => {
eprintln!("Failed to create table read: {err}");
}
}
}
}
Err(err) => {
eprintln!("Failed to plan scan: {err}");
}
}
}
Err(err) => {
eprintln!(
"Failed to get table '{read_table_identifier}' (this is expected if the table doesn't exist): {err}"
);
}
}
println!("\n=== Cleanup ===\n");
println!("\nDropping table '{renamed_identifier}'...");
match catalog.drop_table(&renamed_identifier, false).await {
Ok(()) => println!("Table dropped successfully"),
Err(err) => eprintln!("Failed to drop table: {err}"),
}
println!("Dropping database 'example_db'...");
match catalog.drop_database("example_db", false, true).await {
Ok(()) => println!("Database dropped successfully"),
Err(err) => eprintln!("Failed to drop database: {err}"),
}
println!("\nExample completed!");
}