use std::collections::HashMap;
use std::path::Path;
use std::time::Instant;
use arrow::record_batch::RecordBatch;
use iceberg::memory::{MemoryCatalogBuilder, MEMORY_CATALOG_WAREHOUSE};
use iceberg::spec::{Schema, UnboundPartitionSpec};
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent};
use polars::prelude::DataFrame;
use crate::errors::RunError;
use crate::io::format::{
AcceptedSinkAdapter, AcceptedWriteMetrics, AcceptedWriteOutput, AcceptedWritePerfBreakdown,
};
use crate::io::storage::Target;
use crate::{config, io, FloeResult};
use super::metrics;
mod context;
mod data_files;
mod glue;
mod metadata;
mod schema;
use self::context::{
build_iceberg_write_context, create_table, ensure_namespace, sanitize_table_name,
};
use self::data_files::{iceberg_small_file_threshold_bytes, write_data_files};
use self::glue::{load_glue_table_state, upsert_glue_table};
use self::metadata::parse_metadata_version_from_location;
use self::schema::{ensure_partition_spec_matches, ensure_schema_matches, prepare_iceberg_write};
struct IcebergAcceptedAdapter;
static ICEBERG_ACCEPTED_ADAPTER: IcebergAcceptedAdapter = IcebergAcceptedAdapter;
const ICEBERG_NAMESPACE: &str = "floe";
pub(crate) fn iceberg_accepted_adapter() -> &'static dyn AcceptedSinkAdapter {
&ICEBERG_ACCEPTED_ADAPTER
}
#[derive(Debug)]
struct PreparedIcebergWrite {
iceberg_schema: Schema,
partition_spec: Option<UnboundPartitionSpec>,
batch: RecordBatch,
}
#[derive(Debug)]
struct IcebergWriteResult {
files_written: u64,
snapshot_id: Option<i64>,
metadata_version: Option<i64>,
file_paths: Vec<String>,
metrics: AcceptedWriteMetrics,
table_root_uri: String,
iceberg_catalog_name: Option<String>,
iceberg_database: Option<String>,
iceberg_namespace: Option<String>,
iceberg_table: Option<String>,
perf: AcceptedWritePerfBreakdown,
}
struct IcebergRemoteContext<'a> {
cloud: &'a mut io::storage::CloudClient,
resolver: &'a config::StorageResolver,
catalogs: &'a config::CatalogResolver,
}
struct IcebergWriteContext {
table_root_uri: String,
catalog_name: &'static str,
catalog_props: HashMap<String, String>,
metadata_location: Option<String>,
glue_catalog: Option<GlueIcebergCatalogConfig>,
}
#[derive(Debug, Clone)]
struct GlueIcebergCatalogConfig {
catalog_name: String,
region: String,
database: String,
namespace: String,
table: String,
}
impl AcceptedSinkAdapter for IcebergAcceptedAdapter {
fn write_accepted(
&self,
target: &Target,
df: &mut DataFrame,
mode: config::WriteMode,
_output_stem: &str,
_temp_dir: Option<&Path>,
cloud: &mut io::storage::CloudClient,
resolver: &config::StorageResolver,
catalogs: &config::CatalogResolver,
entity: &config::EntityConfig,
) -> FloeResult<AcceptedWriteOutput> {
write_iceberg_table_with_remote_context(
df,
target,
entity,
mode,
Some(IcebergRemoteContext {
cloud,
resolver,
catalogs,
}),
)
}
}
pub fn write_iceberg_table(
df: &mut DataFrame,
target: &Target,
entity: &config::EntityConfig,
mode: config::WriteMode,
) -> FloeResult<AcceptedWriteOutput> {
write_iceberg_table_with_remote_context(df, target, entity, mode, None)
}
fn write_iceberg_table_with_remote_context(
df: &mut DataFrame,
target: &Target,
entity: &config::EntityConfig,
mode: config::WriteMode,
mut remote: Option<IcebergRemoteContext<'_>>,
) -> FloeResult<AcceptedWriteOutput> {
let write_ctx = build_iceberg_write_context(target, entity, mode, remote.as_mut())?;
let conversion_start = Instant::now();
let prepared = prepare_iceberg_write(df, entity)?;
let conversion_ms = conversion_start.elapsed().as_millis() as u64;
let small_file_threshold_bytes = iceberg_small_file_threshold_bytes(entity);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|err| Box::new(RunError(format!("iceberg runtime init failed: {err}"))))?;
let mut result = runtime.block_on(write_iceberg_table_async(
write_ctx,
prepared,
entity,
mode,
small_file_threshold_bytes,
))?;
result.perf.conversion_ms = Some(conversion_ms);
Ok(AcceptedWriteOutput {
files_written: Some(result.files_written),
parts_written: result.files_written,
part_files: result.file_paths,
table_version: result.metadata_version,
snapshot_id: result.snapshot_id,
table_root_uri: result
.iceberg_catalog_name
.as_ref()
.map(|_| result.table_root_uri.clone()),
iceberg_catalog_name: result.iceberg_catalog_name,
iceberg_database: result.iceberg_database,
iceberg_namespace: result.iceberg_namespace,
iceberg_table: result.iceberg_table,
metrics: result.metrics,
merge: None,
schema_evolution: crate::io::format::AcceptedSchemaEvolution {
enabled: false,
mode: entity
.schema
.resolved_schema_evolution()
.mode
.as_str()
.to_string(),
applied: false,
added_columns: Vec::new(),
incompatible_changes_detected: false,
},
perf: Some(result.perf),
})
}
async fn write_iceberg_table_async(
write_ctx: IcebergWriteContext,
prepared: PreparedIcebergWrite,
entity: &config::EntityConfig,
mode: config::WriteMode,
small_file_threshold_bytes: u64,
) -> FloeResult<IcebergWriteResult> {
let IcebergWriteContext {
table_root_uri,
catalog_name,
mut catalog_props,
mut metadata_location,
glue_catalog,
} = write_ctx;
let mut glue_table_state = None;
if let Some(glue_cfg) = glue_catalog.as_ref() {
let state = load_glue_table_state(glue_cfg).await?;
metadata_location = state.metadata_location.clone();
glue_table_state = Some(state);
}
catalog_props.insert(MEMORY_CATALOG_WAREHOUSE.to_string(), table_root_uri.clone());
let catalog = MemoryCatalogBuilder::default()
.load(catalog_name, catalog_props)
.await
.map_err(map_iceberg_err("iceberg catalog init failed"))?;
let namespace_name = glue_catalog
.as_ref()
.map(|cfg| cfg.namespace.clone())
.unwrap_or_else(|| ICEBERG_NAMESPACE.to_string());
let namespace = NamespaceIdent::new(namespace_name);
ensure_namespace(&catalog, &namespace).await?;
let table_name = glue_catalog
.as_ref()
.map(|cfg| cfg.table.clone())
.unwrap_or_else(|| sanitize_table_name(&entity.name));
let table_ident = TableIdent::new(namespace.clone(), table_name);
let existing_table = if let Some(location) = metadata_location.as_ref() {
Some(
catalog
.register_table(&table_ident, location.clone())
.await
.map_err(map_iceberg_err("iceberg register existing table failed"))?,
)
} else {
None
};
if let Some(existing) = existing_table.as_ref() {
ensure_schema_matches(
existing.metadata().current_schema(),
&prepared.iceberg_schema,
entity,
)?;
ensure_partition_spec_matches(
existing.metadata().default_partition_spec(),
prepared.partition_spec.as_ref(),
&prepared.iceberg_schema,
entity,
)?;
}
let table = match mode {
config::WriteMode::Append => match existing_table {
Some(table) => table,
None => {
create_table(
&catalog,
&namespace,
&table_ident,
table_root_uri.clone(),
&prepared.iceberg_schema,
prepared.partition_spec.clone(),
)
.await?
}
},
config::WriteMode::Overwrite => {
if existing_table.is_some() {
catalog
.drop_table(&table_ident)
.await
.map_err(map_iceberg_err("iceberg drop table mapping failed"))?;
}
create_table(
&catalog,
&namespace,
&table_ident,
table_root_uri.clone(),
&prepared.iceberg_schema,
prepared.partition_spec.clone(),
)
.await?
}
config::WriteMode::MergeScd1 | config::WriteMode::MergeScd2 => {
return Err(Box::new(RunError(format!(
"entity.name={} sink.write_mode={} is only supported for delta accepted sinks",
entity.name,
mode.as_str()
))));
}
};
let mut file_paths = Vec::new();
let mut file_sizes = Vec::new();
let mut files_written = 0_u64;
let mut table_after_write = table;
let mut perf = AcceptedWritePerfBreakdown::default();
if prepared.batch.num_rows() > 0 {
let data_write_start = Instant::now();
let data_files = write_data_files(&table_after_write, prepared.batch).await?;
perf.data_write_ms = Some(data_write_start.elapsed().as_millis() as u64);
files_written = data_files.len() as u64;
file_sizes = data_files
.iter()
.map(iceberg::spec::DataFile::file_size_in_bytes)
.collect();
file_paths = data_files
.iter()
.map(|file| {
let file_path = file.file_path().to_string();
Path::new(file_path.as_str())
.file_name()
.and_then(|name| name.to_str())
.map(ToOwned::to_owned)
.unwrap_or(file_path)
})
.take(50)
.collect();
let tx = Transaction::new(&table_after_write);
let action = tx.fast_append().add_data_files(data_files);
let tx = action
.apply(tx)
.map_err(map_iceberg_err("iceberg append transaction apply failed"))?;
let commit_start = Instant::now();
table_after_write = tx
.commit(&catalog)
.await
.map_err(map_iceberg_err("iceberg commit failed"))?;
perf.commit_ms = Some(commit_start.elapsed().as_millis() as u64);
}
let snapshot_id = table_after_write
.metadata()
.current_snapshot()
.map(|snapshot| snapshot.snapshot_id());
let metadata_version = table_after_write
.metadata_location()
.and_then(parse_metadata_version_from_location);
let final_metadata_location = table_after_write
.metadata_location()
.map(|value| value.to_string())
.ok_or_else(|| {
Box::new(RunError(
"iceberg table metadata location missing after commit".to_string(),
)) as Box<dyn std::error::Error + Send + Sync>
})?;
if let Some(glue_cfg) = glue_catalog.as_ref() {
upsert_glue_table(
glue_cfg,
&table_root_uri,
&final_metadata_location,
glue_table_state
.as_ref()
.and_then(|state| state.version_id.as_deref()),
)
.await?;
}
let metrics_start = Instant::now();
let metrics = metrics::summarize_written_file_sizes(
&file_sizes,
files_written,
small_file_threshold_bytes,
);
perf.metrics_read_ms = Some(metrics_start.elapsed().as_millis() as u64);
Ok(IcebergWriteResult {
files_written,
snapshot_id,
metadata_version,
file_paths,
metrics,
table_root_uri,
iceberg_catalog_name: glue_catalog.as_ref().map(|cfg| cfg.catalog_name.clone()),
iceberg_database: glue_catalog.as_ref().map(|cfg| cfg.database.clone()),
iceberg_namespace: glue_catalog.as_ref().map(|cfg| cfg.namespace.clone()),
iceberg_table: glue_catalog.as_ref().map(|cfg| cfg.table.clone()),
perf,
})
}
fn map_iceberg_err(
context: &'static str,
) -> impl FnOnce(iceberg::Error) -> Box<dyn std::error::Error + Send + Sync> {
move |err| Box::new(RunError(format!("{context}: {err}")))
}