Skip to main content

floe_core/config/
catalog.rs

1use std::collections::HashMap;
2
3use crate::config::{
4    CatalogDefinition, CatalogTypeConfig, EntityConfig, ResolvedPath, RootConfig, SinkTarget,
5    StorageResolver,
6};
7use crate::{ConfigError, FloeResult};
8
9/// Normalizes an identifier for use as a catalog namespace, table, or database name.
10/// Lowercases, replaces non-alphanumeric/non-underscore/non-hyphen chars with `_`,
11/// and trims leading/trailing underscores.
12pub(crate) fn normalize_catalog_ident(value: &str) -> String {
13    let mut out = String::with_capacity(value.len());
14    for ch in value.chars() {
15        let mapped = if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
16            ch.to_ascii_lowercase()
17        } else {
18            '_'
19        };
20        out.push(mapped);
21    }
22    let trimmed = out.trim_matches('_');
23    if trimmed.is_empty() {
24        "default".to_string()
25    } else {
26        trimmed.to_string()
27    }
28}
29
30#[derive(Debug, Clone)]
31pub struct CatalogResolver {
32    has_config: bool,
33    default_name: Option<String>,
34    definitions: HashMap<String, CatalogDefinition>,
35}
36
37/// Fully resolved Iceberg catalog target for a single entity write/seed operation.
38/// Type-specific fields (region, database, etc.) are carried in `type_config`.
39#[derive(Debug, Clone)]
40pub struct ResolvedIcebergCatalogTarget {
41    pub catalog_name: String,
42    pub type_config: CatalogTypeConfig,
43    pub namespace: String,
44    pub table: String,
45    pub table_location: ResolvedPath,
46}
47
48/// Fully resolved Unity Catalog target for a single Delta write operation.
49/// The table storage location is taken directly from the write `Target` — Unity Catalog
50/// is a post-write registration step and does not influence the write path.
51#[derive(Debug, Clone)]
52pub struct ResolvedDeltaCatalogTarget {
53    pub catalog_name: String,
54    pub type_config: CatalogTypeConfig,
55    pub schema: String,
56    pub table: String,
57}
58
59impl CatalogResolver {
60    pub fn new(config: &RootConfig) -> FloeResult<Self> {
61        let Some(catalogs) = &config.catalogs else {
62            return Ok(Self {
63                has_config: false,
64                default_name: None,
65                definitions: HashMap::new(),
66            });
67        };
68
69        let mut definitions = HashMap::new();
70        for definition in &catalogs.definitions {
71            if definitions
72                .insert(definition.name.clone(), definition.clone())
73                .is_some()
74            {
75                return Err(Box::new(ConfigError(format!(
76                    "catalogs.definitions name={} is duplicated",
77                    definition.name
78                ))));
79            }
80        }
81
82        Ok(Self {
83            has_config: true,
84            default_name: catalogs.default.clone(),
85            definitions,
86        })
87    }
88
89    pub fn definition(&self, name: &str) -> Option<CatalogDefinition> {
90        if !self.has_config {
91            return None;
92        }
93        self.definitions.get(name).cloned()
94    }
95
96    pub fn default_name(&self) -> Option<String> {
97        self.default_name.clone()
98    }
99
100    pub fn resolve_iceberg_target(
101        &self,
102        storage_resolver: &StorageResolver,
103        entity: &EntityConfig,
104        sink: &SinkTarget,
105    ) -> FloeResult<Option<ResolvedIcebergCatalogTarget>> {
106        let Some(iceberg_cfg) = sink.iceberg.as_ref() else {
107            return Ok(None);
108        };
109
110        let catalog_name = match iceberg_cfg.catalog.as_deref() {
111            Some(name) => name.to_string(),
112            None => self.default_name.clone().ok_or_else(|| {
113                Box::new(ConfigError(format!(
114                    "entity.name={} sink.accepted.iceberg.catalog is required when no catalogs.default is set",
115                    entity.name
116                ))) as Box<dyn std::error::Error + Send + Sync>
117            })?,
118        };
119        let definition = self.definition(&catalog_name).ok_or_else(|| {
120            Box::new(ConfigError(format!(
121                "entity.name={} sink.accepted.iceberg.catalog references unknown catalog {}",
122                entity.name, catalog_name
123            ))) as Box<dyn std::error::Error + Send + Sync>
124        })?;
125
126        // namespace and table names use the shared normalizer — same rules for all catalog types
127        let database_for_namespace = match &definition.type_config {
128            CatalogTypeConfig::Glue { database, .. } => database.as_str(),
129            // REST warehouse is a catalog/bucket identifier (e.g. "my_catalog.my_schema"),
130            // not a namespace — use "default" so callers always set namespace/domain explicitly.
131            CatalogTypeConfig::Rest { .. } => "default",
132            // Unity catalogs are for Delta, not Iceberg — validate.rs prevents this path.
133            CatalogTypeConfig::Unity { .. } => {
134                return Err(Box::new(ConfigError(format!(
135                    "entity.name={} sink.accepted.iceberg.catalog references a unity catalog, which only supports Delta Lake",
136                    entity.name
137                ))) as Box<dyn std::error::Error + Send + Sync>);
138            }
139        };
140        let namespace_source = iceberg_cfg
141            .namespace
142            .as_deref()
143            .or(entity.domain.as_deref())
144            .unwrap_or(database_for_namespace);
145        let namespace = normalize_catalog_ident(namespace_source);
146        let table_source = iceberg_cfg.table.as_deref().unwrap_or(entity.name.as_str());
147        let table = normalize_catalog_ident(table_source);
148
149        let table_location = if let Some(location) = iceberg_cfg.location.as_deref() {
150            let storage_name = definition
151                .warehouse_storage
152                .as_deref()
153                .or(sink.storage.as_deref());
154            storage_resolver.resolve_path(
155                &entity.name,
156                "sink.accepted.iceberg.location",
157                storage_name,
158                location,
159            )?
160        } else if definition.warehouse_storage.is_some() || definition.warehouse_prefix.is_some() {
161            let mut relative = String::new();
162            if let Some(prefix) = definition.warehouse_prefix.as_deref() {
163                relative.push_str(prefix.trim_matches('/'));
164            }
165            if !namespace.is_empty() {
166                if !relative.is_empty() {
167                    relative.push('/');
168                }
169                relative.push_str(namespace.as_str());
170            }
171            if !table.is_empty() {
172                if !relative.is_empty() {
173                    relative.push('/');
174                }
175                relative.push_str(table.as_str());
176            }
177            let storage_name = definition
178                .warehouse_storage
179                .as_deref()
180                .or(sink.storage.as_deref());
181            storage_resolver.resolve_path(
182                &entity.name,
183                "sink.accepted.iceberg.location",
184                storage_name,
185                &relative,
186            )?
187        } else {
188            storage_resolver.resolve_path(
189                &entity.name,
190                "sink.accepted.storage",
191                sink.storage.as_deref(),
192                &sink.path,
193            )?
194        };
195
196        Ok(Some(ResolvedIcebergCatalogTarget {
197            catalog_name,
198            type_config: definition.type_config,
199            namespace,
200            table,
201            table_location,
202        }))
203    }
204
205    /// Resolves a Unity Catalog target for a Delta Lake write.
206    ///
207    /// Returns `Ok(None)` when `sink.delta` is absent (no catalog registration requested).
208    pub fn resolve_delta_target(
209        &self,
210        entity: &EntityConfig,
211        sink: &SinkTarget,
212    ) -> FloeResult<Option<ResolvedDeltaCatalogTarget>> {
213        let Some(delta_cfg) = sink.delta.as_ref() else {
214            return Ok(None);
215        };
216
217        let catalog_name = match delta_cfg.catalog.as_deref() {
218            Some(name) => name.to_string(),
219            None => self.default_name.clone().ok_or_else(|| {
220                Box::new(ConfigError(format!(
221                    "entity.name={} sink.accepted.delta.catalog is required when no catalogs.default is set",
222                    entity.name
223                ))) as Box<dyn std::error::Error + Send + Sync>
224            })?,
225        };
226
227        let definition = self.definition(&catalog_name).ok_or_else(|| {
228            Box::new(ConfigError(format!(
229                "entity.name={} sink.accepted.delta.catalog references unknown catalog {}",
230                entity.name, catalog_name
231            ))) as Box<dyn std::error::Error + Send + Sync>
232        })?;
233
234        // Schema resolution: entity override → entity domain → catalog definition schema field.
235        let schema_source = delta_cfg.schema.as_deref().or(entity.domain.as_deref());
236        let schema = match (schema_source, &definition.type_config) {
237            (Some(s), _) => normalize_catalog_ident(s),
238            (None, CatalogTypeConfig::Unity { schema, .. }) => normalize_catalog_ident(schema),
239            (None, _) => "default".to_string(),
240        };
241
242        let table_source = delta_cfg.table.as_deref().unwrap_or(entity.name.as_str());
243        let table = normalize_catalog_ident(table_source);
244
245        Ok(Some(ResolvedDeltaCatalogTarget {
246            catalog_name,
247            type_config: definition.type_config,
248            schema,
249            table,
250        }))
251    }
252}