floe_core/config/
catalog.rs1use std::collections::HashMap;
2
3use crate::config::{
4 CatalogDefinition, EntityConfig, ResolvedPath, RootConfig, SinkTarget, StorageResolver,
5};
6use crate::{ConfigError, FloeResult};
7
8#[derive(Debug, Clone)]
9pub struct CatalogResolver {
10 has_config: bool,
11 default_name: Option<String>,
12 definitions: HashMap<String, CatalogDefinition>,
13}
14
15#[derive(Debug, Clone)]
16pub struct ResolvedIcebergCatalogTarget {
17 pub catalog_name: String,
18 pub catalog_type: String,
19 pub region: String,
20 pub database: String,
21 pub namespace: String,
22 pub table: String,
23 pub table_location: ResolvedPath,
24}
25
26impl CatalogResolver {
27 pub fn new(config: &RootConfig) -> FloeResult<Self> {
28 let Some(catalogs) = &config.catalogs else {
29 return Ok(Self {
30 has_config: false,
31 default_name: None,
32 definitions: HashMap::new(),
33 });
34 };
35
36 let mut definitions = HashMap::new();
37 for definition in &catalogs.definitions {
38 if definitions
39 .insert(definition.name.clone(), definition.clone())
40 .is_some()
41 {
42 return Err(Box::new(ConfigError(format!(
43 "catalogs.definitions name={} is duplicated",
44 definition.name
45 ))));
46 }
47 }
48
49 Ok(Self {
50 has_config: true,
51 default_name: catalogs.default.clone(),
52 definitions,
53 })
54 }
55
56 pub fn definition(&self, name: &str) -> Option<CatalogDefinition> {
57 if !self.has_config {
58 return None;
59 }
60 self.definitions.get(name).cloned()
61 }
62
63 pub fn resolve_iceberg_target(
64 &self,
65 storage_resolver: &StorageResolver,
66 entity: &EntityConfig,
67 sink: &SinkTarget,
68 ) -> FloeResult<Option<ResolvedIcebergCatalogTarget>> {
69 let Some(iceberg_cfg) = sink.iceberg.as_ref() else {
70 return Ok(None);
71 };
72
73 let catalog_name = match iceberg_cfg.catalog.as_deref() {
74 Some(name) => name.to_string(),
75 None => self.default_name.clone().ok_or_else(|| {
76 Box::new(ConfigError(format!(
77 "entity.name={} sink.accepted.iceberg.catalog is required when no catalogs.default is set",
78 entity.name
79 ))) as Box<dyn std::error::Error + Send + Sync>
80 })?,
81 };
82 let definition = self.definition(&catalog_name).ok_or_else(|| {
83 Box::new(ConfigError(format!(
84 "entity.name={} sink.accepted.iceberg.catalog references unknown catalog {}",
85 entity.name, catalog_name
86 ))) as Box<dyn std::error::Error + Send + Sync>
87 })?;
88
89 let region = definition.region.clone().ok_or_else(|| {
90 Box::new(ConfigError(format!(
91 "catalogs.definitions name={} requires region for type {}",
92 definition.name, definition.catalog_type
93 ))) as Box<dyn std::error::Error + Send + Sync>
94 })?;
95 let database = normalize_glue_ident(definition.database.as_deref().ok_or_else(|| {
96 Box::new(ConfigError(format!(
97 "catalogs.definitions name={} requires database for type {}",
98 definition.name, definition.catalog_type
99 ))) as Box<dyn std::error::Error + Send + Sync>
100 })?);
101
102 let namespace_source = iceberg_cfg
103 .namespace
104 .as_deref()
105 .or(entity.domain.as_deref())
106 .unwrap_or(database.as_str());
107 let namespace = normalize_glue_ident(namespace_source);
108 let table_source = iceberg_cfg.table.as_deref().unwrap_or(entity.name.as_str());
109 let table = normalize_glue_ident(table_source);
110
111 let table_location = if let Some(location) = iceberg_cfg.location.as_deref() {
112 let storage_name = definition
113 .warehouse_storage
114 .as_deref()
115 .or(sink.storage.as_deref());
116 storage_resolver.resolve_path(
117 &entity.name,
118 "sink.accepted.iceberg.location",
119 storage_name,
120 location,
121 )?
122 } else if definition.warehouse_storage.is_some() || definition.warehouse_prefix.is_some() {
123 let mut relative = String::new();
124 if let Some(prefix) = definition.warehouse_prefix.as_deref() {
125 relative.push_str(prefix.trim_matches('/'));
126 }
127 if !namespace.is_empty() {
128 if !relative.is_empty() {
129 relative.push('/');
130 }
131 relative.push_str(namespace.as_str());
132 }
133 if !table.is_empty() {
134 if !relative.is_empty() {
135 relative.push('/');
136 }
137 relative.push_str(table.as_str());
138 }
139 let storage_name = definition
140 .warehouse_storage
141 .as_deref()
142 .or(sink.storage.as_deref());
143 storage_resolver.resolve_path(
144 &entity.name,
145 "sink.accepted.iceberg.location",
146 storage_name,
147 &relative,
148 )?
149 } else {
150 storage_resolver.resolve_path(
151 &entity.name,
152 "sink.accepted.storage",
153 sink.storage.as_deref(),
154 &sink.path,
155 )?
156 };
157
158 Ok(Some(ResolvedIcebergCatalogTarget {
159 catalog_name,
160 catalog_type: definition.catalog_type,
161 region,
162 database,
163 namespace,
164 table,
165 table_location,
166 }))
167 }
168}
169
170fn normalize_glue_ident(value: &str) -> String {
171 let mut out = String::with_capacity(value.len());
172 for ch in value.chars() {
173 let mapped = if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
174 ch.to_ascii_lowercase()
175 } else {
176 '_'
177 };
178 out.push(mapped);
179 }
180 let trimmed = out.trim_matches('_');
181 if trimmed.is_empty() {
182 "default".to_string()
183 } else {
184 trimmed.to_string()
185 }
186}