floe_core/config/
catalog.rs1use std::collections::HashMap;
2
3use crate::config::{
4 CatalogDefinition, CatalogTypeConfig, EntityConfig, ResolvedPath, RootConfig, SinkTarget,
5 StorageResolver,
6};
7use crate::{ConfigError, FloeResult};
8
9pub(crate) fn normalize_catalog_ident(value: &str) -> String {
13 let mut out = String::with_capacity(value.len());
14 for ch in value.chars() {
15 let mapped = if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
16 ch.to_ascii_lowercase()
17 } else {
18 '_'
19 };
20 out.push(mapped);
21 }
22 let trimmed = out.trim_matches('_');
23 if trimmed.is_empty() {
24 "default".to_string()
25 } else {
26 trimmed.to_string()
27 }
28}
29
30#[derive(Debug, Clone)]
31pub struct CatalogResolver {
32 has_config: bool,
33 default_name: Option<String>,
34 definitions: HashMap<String, CatalogDefinition>,
35}
36
37#[derive(Debug, Clone)]
40pub struct ResolvedIcebergCatalogTarget {
41 pub catalog_name: String,
42 pub type_config: CatalogTypeConfig,
43 pub namespace: String,
44 pub table: String,
45 pub table_location: ResolvedPath,
46}
47
48#[derive(Debug, Clone)]
52pub struct ResolvedDeltaCatalogTarget {
53 pub catalog_name: String,
54 pub type_config: CatalogTypeConfig,
55 pub schema: String,
56 pub table: String,
57}
58
59impl CatalogResolver {
60 pub fn new(config: &RootConfig) -> FloeResult<Self> {
61 let Some(catalogs) = &config.catalogs else {
62 return Ok(Self {
63 has_config: false,
64 default_name: None,
65 definitions: HashMap::new(),
66 });
67 };
68
69 let mut definitions = HashMap::new();
70 for definition in &catalogs.definitions {
71 if definitions
72 .insert(definition.name.clone(), definition.clone())
73 .is_some()
74 {
75 return Err(Box::new(ConfigError(format!(
76 "catalogs.definitions name={} is duplicated",
77 definition.name
78 ))));
79 }
80 }
81
82 Ok(Self {
83 has_config: true,
84 default_name: catalogs.default.clone(),
85 definitions,
86 })
87 }
88
89 pub fn definition(&self, name: &str) -> Option<CatalogDefinition> {
90 if !self.has_config {
91 return None;
92 }
93 self.definitions.get(name).cloned()
94 }
95
96 pub fn default_name(&self) -> Option<String> {
97 self.default_name.clone()
98 }
99
100 pub fn resolve_iceberg_target(
101 &self,
102 storage_resolver: &StorageResolver,
103 entity: &EntityConfig,
104 sink: &SinkTarget,
105 ) -> FloeResult<Option<ResolvedIcebergCatalogTarget>> {
106 let Some(iceberg_cfg) = sink.iceberg.as_ref() else {
107 return Ok(None);
108 };
109
110 let catalog_name = match iceberg_cfg.catalog.as_deref() {
111 Some(name) => name.to_string(),
112 None => self.default_name.clone().ok_or_else(|| {
113 Box::new(ConfigError(format!(
114 "entity.name={} sink.accepted.iceberg.catalog is required when no catalogs.default is set",
115 entity.name
116 ))) as Box<dyn std::error::Error + Send + Sync>
117 })?,
118 };
119 let definition = self.definition(&catalog_name).ok_or_else(|| {
120 Box::new(ConfigError(format!(
121 "entity.name={} sink.accepted.iceberg.catalog references unknown catalog {}",
122 entity.name, catalog_name
123 ))) as Box<dyn std::error::Error + Send + Sync>
124 })?;
125
126 let database_for_namespace = match &definition.type_config {
128 CatalogTypeConfig::Glue { database, .. } => database.as_str(),
129 CatalogTypeConfig::Rest { .. } => "default",
132 CatalogTypeConfig::Unity { .. } => {
134 return Err(Box::new(ConfigError(format!(
135 "entity.name={} sink.accepted.iceberg.catalog references a unity catalog, which only supports Delta Lake",
136 entity.name
137 ))) as Box<dyn std::error::Error + Send + Sync>);
138 }
139 };
140 let namespace_source = iceberg_cfg
141 .namespace
142 .as_deref()
143 .or(entity.domain.as_deref())
144 .unwrap_or(database_for_namespace);
145 let namespace = normalize_catalog_ident(namespace_source);
146 let table_source = iceberg_cfg.table.as_deref().unwrap_or(entity.name.as_str());
147 let table = normalize_catalog_ident(table_source);
148
149 let table_location = if let Some(location) = iceberg_cfg.location.as_deref() {
150 let storage_name = definition
151 .warehouse_storage
152 .as_deref()
153 .or(sink.storage.as_deref());
154 storage_resolver.resolve_path(
155 &entity.name,
156 "sink.accepted.iceberg.location",
157 storage_name,
158 location,
159 )?
160 } else if definition.warehouse_storage.is_some() || definition.warehouse_prefix.is_some() {
161 let mut relative = String::new();
162 if let Some(prefix) = definition.warehouse_prefix.as_deref() {
163 relative.push_str(prefix.trim_matches('/'));
164 }
165 if !namespace.is_empty() {
166 if !relative.is_empty() {
167 relative.push('/');
168 }
169 relative.push_str(namespace.as_str());
170 }
171 if !table.is_empty() {
172 if !relative.is_empty() {
173 relative.push('/');
174 }
175 relative.push_str(table.as_str());
176 }
177 let storage_name = definition
178 .warehouse_storage
179 .as_deref()
180 .or(sink.storage.as_deref());
181 storage_resolver.resolve_path(
182 &entity.name,
183 "sink.accepted.iceberg.location",
184 storage_name,
185 &relative,
186 )?
187 } else {
188 storage_resolver.resolve_path(
189 &entity.name,
190 "sink.accepted.storage",
191 sink.storage.as_deref(),
192 &sink.path,
193 )?
194 };
195
196 Ok(Some(ResolvedIcebergCatalogTarget {
197 catalog_name,
198 type_config: definition.type_config,
199 namespace,
200 table,
201 table_location,
202 }))
203 }
204
205 pub fn resolve_delta_target(
209 &self,
210 entity: &EntityConfig,
211 sink: &SinkTarget,
212 ) -> FloeResult<Option<ResolvedDeltaCatalogTarget>> {
213 let Some(delta_cfg) = sink.delta.as_ref() else {
214 return Ok(None);
215 };
216
217 let catalog_name = match delta_cfg.catalog.as_deref() {
218 Some(name) => name.to_string(),
219 None => self.default_name.clone().ok_or_else(|| {
220 Box::new(ConfigError(format!(
221 "entity.name={} sink.accepted.delta.catalog is required when no catalogs.default is set",
222 entity.name
223 ))) as Box<dyn std::error::Error + Send + Sync>
224 })?,
225 };
226
227 let definition = self.definition(&catalog_name).ok_or_else(|| {
228 Box::new(ConfigError(format!(
229 "entity.name={} sink.accepted.delta.catalog references unknown catalog {}",
230 entity.name, catalog_name
231 ))) as Box<dyn std::error::Error + Send + Sync>
232 })?;
233
234 let schema_source = delta_cfg.schema.as_deref().or(entity.domain.as_deref());
236 let schema = match (schema_source, &definition.type_config) {
237 (Some(s), _) => normalize_catalog_ident(s),
238 (None, CatalogTypeConfig::Unity { schema, .. }) => normalize_catalog_ident(schema),
239 (None, _) => "default".to_string(),
240 };
241
242 let table_source = delta_cfg.table.as_deref().unwrap_or(entity.name.as_str());
243 let table = normalize_catalog_ident(table_source);
244
245 Ok(Some(ResolvedDeltaCatalogTarget {
246 catalog_name,
247 type_config: definition.type_config,
248 schema,
249 table,
250 }))
251 }
252}