use std::collections::HashMap;
use crate::config::{
CatalogDefinition, CatalogTypeConfig, EntityConfig, ResolvedPath, RootConfig, SinkTarget,
StorageResolver,
};
use crate::{ConfigError, FloeResult};
pub(crate) fn normalize_catalog_ident(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for ch in value.chars() {
let mapped = if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
ch.to_ascii_lowercase()
} else {
'_'
};
out.push(mapped);
}
let trimmed = out.trim_matches('_');
if trimmed.is_empty() {
"default".to_string()
} else {
trimmed.to_string()
}
}
#[derive(Debug, Clone)]
pub struct CatalogResolver {
has_config: bool,
default_name: Option<String>,
definitions: HashMap<String, CatalogDefinition>,
}
#[derive(Debug, Clone)]
pub struct ResolvedIcebergCatalogTarget {
pub catalog_name: String,
pub type_config: CatalogTypeConfig,
pub namespace: String,
pub table: String,
pub table_location: ResolvedPath,
}
#[derive(Debug, Clone)]
pub struct ResolvedDeltaCatalogTarget {
pub catalog_name: String,
pub type_config: CatalogTypeConfig,
pub schema: String,
pub table: String,
}
impl CatalogResolver {
pub fn new(config: &RootConfig) -> FloeResult<Self> {
let Some(catalogs) = &config.catalogs else {
return Ok(Self {
has_config: false,
default_name: None,
definitions: HashMap::new(),
});
};
let mut definitions = HashMap::new();
for definition in &catalogs.definitions {
if definitions
.insert(definition.name.clone(), definition.clone())
.is_some()
{
return Err(Box::new(ConfigError(format!(
"catalogs.definitions name={} is duplicated",
definition.name
))));
}
}
Ok(Self {
has_config: true,
default_name: catalogs.default.clone(),
definitions,
})
}
pub fn definition(&self, name: &str) -> Option<CatalogDefinition> {
if !self.has_config {
return None;
}
self.definitions.get(name).cloned()
}
pub fn default_name(&self) -> Option<String> {
self.default_name.clone()
}
pub fn resolve_iceberg_target(
&self,
storage_resolver: &StorageResolver,
entity: &EntityConfig,
sink: &SinkTarget,
) -> FloeResult<Option<ResolvedIcebergCatalogTarget>> {
let Some(iceberg_cfg) = sink.iceberg.as_ref() else {
return Ok(None);
};
let catalog_name = match iceberg_cfg.catalog.as_deref() {
Some(name) => name.to_string(),
None => self.default_name.clone().ok_or_else(|| {
Box::new(ConfigError(format!(
"entity.name={} sink.accepted.iceberg.catalog is required when no catalogs.default is set",
entity.name
))) as Box<dyn std::error::Error + Send + Sync>
})?,
};
let definition = self.definition(&catalog_name).ok_or_else(|| {
Box::new(ConfigError(format!(
"entity.name={} sink.accepted.iceberg.catalog references unknown catalog {}",
entity.name, catalog_name
))) as Box<dyn std::error::Error + Send + Sync>
})?;
let database_for_namespace = match &definition.type_config {
CatalogTypeConfig::Glue { database, .. } => database.as_str(),
CatalogTypeConfig::Rest { .. } => "default",
CatalogTypeConfig::Unity { .. } => {
return Err(Box::new(ConfigError(format!(
"entity.name={} sink.accepted.iceberg.catalog references a unity catalog, which only supports Delta Lake",
entity.name
))) as Box<dyn std::error::Error + Send + Sync>);
}
};
let namespace_source = iceberg_cfg
.namespace
.as_deref()
.or(entity.domain.as_deref())
.unwrap_or(database_for_namespace);
let namespace = normalize_catalog_ident(namespace_source);
let table_source = iceberg_cfg.table.as_deref().unwrap_or(entity.name.as_str());
let table = normalize_catalog_ident(table_source);
let table_location = if let Some(location) = iceberg_cfg.location.as_deref() {
let storage_name = definition
.warehouse_storage
.as_deref()
.or(sink.storage.as_deref());
storage_resolver.resolve_path(
&entity.name,
"sink.accepted.iceberg.location",
storage_name,
location,
)?
} else if definition.warehouse_storage.is_some() || definition.warehouse_prefix.is_some() {
let mut relative = String::new();
if let Some(prefix) = definition.warehouse_prefix.as_deref() {
relative.push_str(prefix.trim_matches('/'));
}
if !namespace.is_empty() {
if !relative.is_empty() {
relative.push('/');
}
relative.push_str(namespace.as_str());
}
if !table.is_empty() {
if !relative.is_empty() {
relative.push('/');
}
relative.push_str(table.as_str());
}
let storage_name = definition
.warehouse_storage
.as_deref()
.or(sink.storage.as_deref());
storage_resolver.resolve_path(
&entity.name,
"sink.accepted.iceberg.location",
storage_name,
&relative,
)?
} else {
storage_resolver.resolve_path(
&entity.name,
"sink.accepted.storage",
sink.storage.as_deref(),
&sink.path,
)?
};
Ok(Some(ResolvedIcebergCatalogTarget {
catalog_name,
type_config: definition.type_config,
namespace,
table,
table_location,
}))
}
pub fn resolve_delta_target(
&self,
entity: &EntityConfig,
sink: &SinkTarget,
) -> FloeResult<Option<ResolvedDeltaCatalogTarget>> {
let Some(delta_cfg) = sink.delta.as_ref() else {
return Ok(None);
};
let catalog_name = match delta_cfg.catalog.as_deref() {
Some(name) => name.to_string(),
None => self.default_name.clone().ok_or_else(|| {
Box::new(ConfigError(format!(
"entity.name={} sink.accepted.delta.catalog is required when no catalogs.default is set",
entity.name
))) as Box<dyn std::error::Error + Send + Sync>
})?,
};
let definition = self.definition(&catalog_name).ok_or_else(|| {
Box::new(ConfigError(format!(
"entity.name={} sink.accepted.delta.catalog references unknown catalog {}",
entity.name, catalog_name
))) as Box<dyn std::error::Error + Send + Sync>
})?;
let schema_source = delta_cfg.schema.as_deref().or(entity.domain.as_deref());
let schema = match (schema_source, &definition.type_config) {
(Some(s), _) => normalize_catalog_ident(s),
(None, CatalogTypeConfig::Unity { schema, .. }) => normalize_catalog_ident(schema),
(None, _) => "default".to_string(),
};
let table_source = delta_cfg.table.as_deref().unwrap_or(entity.name.as_str());
let table = normalize_catalog_ident(table_source);
Ok(Some(ResolvedDeltaCatalogTarget {
catalog_name,
type_config: definition.type_config,
schema,
table,
}))
}
}