floe_core/checks/
normalize.rs1use std::collections::{HashMap, HashSet};
2
3use polars::prelude::DataFrame;
4
5use crate::{config, ConfigError, FloeResult};
6
7pub fn resolve_normalize_strategy(entity: &config::EntityConfig) -> FloeResult<Option<String>> {
8 let normalize = match &entity.schema.normalize_columns {
9 Some(config) => config.enabled.unwrap_or(false),
10 None => false,
11 };
12 if !normalize {
13 return Ok(None);
14 }
15 let raw = entity
16 .schema
17 .normalize_columns
18 .as_ref()
19 .and_then(|config| config.strategy.as_deref())
20 .unwrap_or("snake_case");
21 let normalized = normalize_strategy_name(raw);
22 match normalized.as_str() {
23 "snakecase" | "lower" | "camelcase" | "none" => Ok(Some(normalized)),
24 _ => Err(Box::new(ConfigError(format!(
25 "unsupported normalize_columns.strategy: {raw}"
26 )))),
27 }
28}
29
30pub fn resolve_source_columns(
31 columns: &[config::ColumnConfig],
32 strategy: Option<&str>,
33 keep_sources: bool,
34) -> FloeResult<Vec<config::ColumnConfig>> {
35 let mut resolved = Vec::with_capacity(columns.len());
36 let mut seen = HashMap::new();
37 for column in columns {
38 let source_name = column.source_or_name();
39 let normalized_name = if let Some(strategy) = strategy {
40 normalize_name(source_name, strategy)
41 } else {
42 source_name.to_string()
43 };
44 if let Some(existing) = seen.insert(normalized_name.clone(), source_name.to_string()) {
45 return Err(Box::new(ConfigError(format!(
46 "column source collision: {} and {} -> {}",
47 existing, column.name, normalized_name
48 ))));
49 }
50 resolved.push(config::ColumnConfig {
51 name: normalized_name,
52 source: if keep_sources {
53 Some(source_name.to_string())
54 } else {
55 None
56 },
57 column_type: column.column_type.clone(),
58 nullable: column.nullable,
59 unique: column.unique,
60 width: column.width,
61 trim: column.trim,
62 });
63 }
64 Ok(resolved)
65}
66
67pub fn source_column_mapping(
68 columns: &[config::ColumnConfig],
69 strategy: Option<&str>,
70) -> FloeResult<HashMap<String, String>> {
71 let mut mapping = HashMap::new();
72 let mut seen = HashSet::new();
73 for column in columns {
74 let Some(source) = column.source.as_deref() else {
75 continue;
76 };
77 let normalized = if let Some(strategy) = strategy {
78 normalize_name(source, strategy)
79 } else {
80 source.to_string()
81 };
82 if !seen.insert(normalized.clone()) {
83 return Err(Box::new(ConfigError(format!(
84 "column source collision: duplicate source selector {}",
85 normalized
86 ))));
87 }
88 mapping.insert(normalized, source.to_string());
89 }
90 Ok(mapping)
91}
92
93pub fn output_column_mapping(
94 columns: &[config::ColumnConfig],
95 strategy: Option<&str>,
96) -> FloeResult<HashMap<String, String>> {
97 let mut mapping = HashMap::new();
98 let mut targets = HashMap::new();
99 for column in columns {
100 let source_name = column.source_or_name();
101 let normalized_source = if let Some(strategy) = strategy {
102 normalize_name(source_name, strategy)
103 } else {
104 source_name.to_string()
105 };
106 let target_name = if column.source.is_some() {
107 column.name.clone()
108 } else if let Some(strategy) = strategy {
109 normalize_name(&column.name, strategy)
110 } else {
111 column.name.clone()
112 };
113 if let Some(existing) = targets.insert(target_name.clone(), normalized_source.clone()) {
114 return Err(Box::new(ConfigError(format!(
115 "output column name collision: {} and {} -> {}",
116 existing, normalized_source, target_name
117 ))));
118 }
119 if normalized_source != target_name {
120 mapping.insert(normalized_source, target_name);
121 }
122 }
123 Ok(mapping)
124}
125
126pub fn pii_schema_to_runtime_mapping(
131 columns: &[config::ColumnConfig],
132 strategy: Option<&str>,
133) -> HashMap<String, String> {
134 let mut mapping = HashMap::new();
135 for column in columns {
136 let runtime_name = if column.source.is_some() {
137 column.name.clone()
138 } else if let Some(strategy) = strategy {
139 normalize_name(&column.name, strategy)
140 } else {
141 column.name.clone()
142 };
143 if column.name != runtime_name {
144 mapping.insert(column.name.clone(), runtime_name);
145 }
146 }
147 mapping
148}
149
150pub fn resolve_output_columns(
151 columns: &[config::ColumnConfig],
152 strategy: Option<&str>,
153) -> Vec<config::ColumnConfig> {
154 columns
155 .iter()
156 .map(|column| {
157 let name = if column.source.is_some() {
158 column.name.clone()
159 } else if let Some(strategy) = strategy {
160 normalize_name(&column.name, strategy)
161 } else {
162 column.name.clone()
163 };
164 config::ColumnConfig {
165 name,
166 source: None,
167 column_type: column.column_type.clone(),
168 nullable: column.nullable,
169 unique: column.unique,
170 width: column.width,
171 trim: column.trim,
172 }
173 })
174 .collect()
175}
176
177pub fn rename_output_columns(
178 df: &mut DataFrame,
179 mapping: &HashMap<String, String>,
180) -> FloeResult<()> {
181 if mapping.is_empty() {
182 return Ok(());
183 }
184 let names = df
185 .get_column_names()
186 .iter()
187 .map(|name| name.to_string())
188 .collect::<Vec<_>>();
189 let mut renamed = Vec::with_capacity(names.len());
190 for name in &names {
191 if let Some(target) = mapping.get(name) {
192 renamed.push(target.clone());
193 } else {
194 renamed.push(name.clone());
195 }
196 }
197 df.set_column_names(renamed.iter()).map_err(|err| {
198 Box::new(ConfigError(format!(
199 "failed to rename output columns: {err}"
200 )))
201 })?;
202 Ok(())
203}
204
205pub fn normalize_dataframe_columns(df: &mut DataFrame, strategy: &str) -> FloeResult<()> {
206 let names = df.get_column_names();
207 let mut normalized_names = Vec::with_capacity(names.len());
208 let mut seen = HashMap::new();
209 for name in names {
210 let normalized = normalize_name(name, strategy);
211 if let Some(existing) = seen.insert(normalized.clone(), name.to_string()) {
212 return Err(Box::new(ConfigError(format!(
213 "normalized input column collision: {} and {} -> {}",
214 existing, name, normalized
215 ))));
216 }
217 normalized_names.push(normalized);
218 }
219 df.set_column_names(normalized_names.iter())
220 .map_err(|err| {
221 Box::new(ConfigError(format!(
222 "failed to normalize column names: {err}"
223 )))
224 })?;
225 Ok(())
226}
227
228fn normalize_strategy_name(value: &str) -> String {
229 value.to_ascii_lowercase().replace(['-', '_'], "")
230}
231
232pub fn normalize_name(value: &str, strategy: &str) -> String {
233 match normalize_strategy_name(strategy).as_str() {
234 "snakecase" => to_snake_case(value),
235 "lower" => value.to_ascii_lowercase(),
236 "camelcase" => to_camel_case(value),
237 "none" => value.to_string(),
238 _ => value.to_string(),
239 }
240}
241
242fn to_snake_case(value: &str) -> String {
243 split_words(value).join("_")
244}
245
246fn to_camel_case(value: &str) -> String {
247 let words = split_words(value);
248 if words.is_empty() {
249 return String::new();
250 }
251 let mut out = String::new();
252 out.push_str(&words[0]);
253 for word in words.iter().skip(1) {
254 out.push_str(&capitalize(word));
255 }
256 out
257}
258
259fn split_words(value: &str) -> Vec<String> {
260 let chars: Vec<char> = value.chars().collect();
261 let mut words = Vec::new();
262 let mut current = String::new();
263 for (idx, ch) in chars.iter().copied().enumerate() {
264 if !ch.is_ascii_alphanumeric() {
265 if !current.is_empty() {
266 words.push(current);
267 current = String::new();
268 }
269 continue;
270 }
271
272 let is_upper = ch.is_ascii_uppercase();
273 let prev = if idx > 0 { Some(chars[idx - 1]) } else { None };
274 let next = chars.get(idx + 1).copied();
275 let prev_is_lower = prev.map(|c| c.is_ascii_lowercase()).unwrap_or(false);
276 let prev_is_digit = prev.map(|c| c.is_ascii_digit()).unwrap_or(false);
277 let prev_is_upper = prev.map(|c| c.is_ascii_uppercase()).unwrap_or(false);
278 let next_is_lower = next.map(|c| c.is_ascii_lowercase()).unwrap_or(false);
279
280 if !current.is_empty()
281 && is_upper
282 && ((prev_is_lower || prev_is_digit) || (prev_is_upper && next_is_lower))
283 {
284 words.push(current);
285 current = String::new();
286 }
287
288 current.push(ch.to_ascii_lowercase());
289 }
290
291 if !current.is_empty() {
292 words.push(current);
293 }
294
295 words
296}
297
298fn capitalize(value: &str) -> String {
299 let mut chars = value.chars();
300 match chars.next() {
301 Some(first) => first.to_ascii_uppercase().to_string() + chars.as_str(),
302 None => String::new(),
303 }
304}