Skip to main content

floe_core/config/
catalog.rs

1use std::collections::HashMap;
2
3use crate::config::{
4    CatalogDefinition, EntityConfig, ResolvedPath, RootConfig, SinkTarget, StorageResolver,
5};
6use crate::{ConfigError, FloeResult};
7
8#[derive(Debug, Clone)]
9pub struct CatalogResolver {
10    has_config: bool,
11    default_name: Option<String>,
12    definitions: HashMap<String, CatalogDefinition>,
13}
14
15#[derive(Debug, Clone)]
16pub struct ResolvedIcebergCatalogTarget {
17    pub catalog_name: String,
18    pub catalog_type: String,
19    pub region: String,
20    pub database: String,
21    pub namespace: String,
22    pub table: String,
23    pub table_location: ResolvedPath,
24}
25
26impl CatalogResolver {
27    pub fn new(config: &RootConfig) -> FloeResult<Self> {
28        let Some(catalogs) = &config.catalogs else {
29            return Ok(Self {
30                has_config: false,
31                default_name: None,
32                definitions: HashMap::new(),
33            });
34        };
35
36        let mut definitions = HashMap::new();
37        for definition in &catalogs.definitions {
38            if definitions
39                .insert(definition.name.clone(), definition.clone())
40                .is_some()
41            {
42                return Err(Box::new(ConfigError(format!(
43                    "catalogs.definitions name={} is duplicated",
44                    definition.name
45                ))));
46            }
47        }
48
49        Ok(Self {
50            has_config: true,
51            default_name: catalogs.default.clone(),
52            definitions,
53        })
54    }
55
56    pub fn definition(&self, name: &str) -> Option<CatalogDefinition> {
57        if !self.has_config {
58            return None;
59        }
60        self.definitions.get(name).cloned()
61    }
62
63    pub fn resolve_iceberg_target(
64        &self,
65        storage_resolver: &StorageResolver,
66        entity: &EntityConfig,
67        sink: &SinkTarget,
68    ) -> FloeResult<Option<ResolvedIcebergCatalogTarget>> {
69        let Some(iceberg_cfg) = sink.iceberg.as_ref() else {
70            return Ok(None);
71        };
72
73        let catalog_name = match iceberg_cfg.catalog.as_deref() {
74            Some(name) => name.to_string(),
75            None => self.default_name.clone().ok_or_else(|| {
76                Box::new(ConfigError(format!(
77                    "entity.name={} sink.accepted.iceberg.catalog is required when no catalogs.default is set",
78                    entity.name
79                ))) as Box<dyn std::error::Error + Send + Sync>
80            })?,
81        };
82        let definition = self.definition(&catalog_name).ok_or_else(|| {
83            Box::new(ConfigError(format!(
84                "entity.name={} sink.accepted.iceberg.catalog references unknown catalog {}",
85                entity.name, catalog_name
86            ))) as Box<dyn std::error::Error + Send + Sync>
87        })?;
88
89        let region = definition.region.clone().ok_or_else(|| {
90            Box::new(ConfigError(format!(
91                "catalogs.definitions name={} requires region for type {}",
92                definition.name, definition.catalog_type
93            ))) as Box<dyn std::error::Error + Send + Sync>
94        })?;
95        let database = normalize_glue_ident(definition.database.as_deref().ok_or_else(|| {
96            Box::new(ConfigError(format!(
97                "catalogs.definitions name={} requires database for type {}",
98                definition.name, definition.catalog_type
99            ))) as Box<dyn std::error::Error + Send + Sync>
100        })?);
101
102        let namespace_source = iceberg_cfg
103            .namespace
104            .as_deref()
105            .or(entity.domain.as_deref())
106            .unwrap_or(database.as_str());
107        let namespace = normalize_glue_ident(namespace_source);
108        let table_source = iceberg_cfg.table.as_deref().unwrap_or(entity.name.as_str());
109        let table = normalize_glue_ident(table_source);
110
111        let table_location = if let Some(location) = iceberg_cfg.location.as_deref() {
112            let storage_name = definition
113                .warehouse_storage
114                .as_deref()
115                .or(sink.storage.as_deref());
116            storage_resolver.resolve_path(
117                &entity.name,
118                "sink.accepted.iceberg.location",
119                storage_name,
120                location,
121            )?
122        } else if definition.warehouse_storage.is_some() || definition.warehouse_prefix.is_some() {
123            let mut relative = String::new();
124            if let Some(prefix) = definition.warehouse_prefix.as_deref() {
125                relative.push_str(prefix.trim_matches('/'));
126            }
127            if !namespace.is_empty() {
128                if !relative.is_empty() {
129                    relative.push('/');
130                }
131                relative.push_str(namespace.as_str());
132            }
133            if !table.is_empty() {
134                if !relative.is_empty() {
135                    relative.push('/');
136                }
137                relative.push_str(table.as_str());
138            }
139            let storage_name = definition
140                .warehouse_storage
141                .as_deref()
142                .or(sink.storage.as_deref());
143            storage_resolver.resolve_path(
144                &entity.name,
145                "sink.accepted.iceberg.location",
146                storage_name,
147                &relative,
148            )?
149        } else {
150            storage_resolver.resolve_path(
151                &entity.name,
152                "sink.accepted.storage",
153                sink.storage.as_deref(),
154                &sink.path,
155            )?
156        };
157
158        Ok(Some(ResolvedIcebergCatalogTarget {
159            catalog_name,
160            catalog_type: definition.catalog_type,
161            region,
162            database,
163            namespace,
164            table,
165            table_location,
166        }))
167    }
168}
169
170fn normalize_glue_ident(value: &str) -> String {
171    let mut out = String::with_capacity(value.len());
172    for ch in value.chars() {
173        let mapped = if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
174            ch.to_ascii_lowercase()
175        } else {
176            '_'
177        };
178        out.push(mapped);
179    }
180    let trimmed = out.trim_matches('_');
181    if trimmed.is_empty() {
182        "default".to_string()
183    } else {
184        trimmed.to_string()
185    }
186}