floe_core/checks/
normalize.rs1use std::collections::{HashMap, HashSet};
2
3use polars::prelude::DataFrame;
4
5use crate::{config, ConfigError, FloeResult};
6
7pub fn resolve_normalize_strategy(entity: &config::EntityConfig) -> FloeResult<Option<String>> {
8 let normalize = match &entity.schema.normalize_columns {
9 Some(config) => config.enabled.unwrap_or(false),
10 None => false,
11 };
12 if !normalize {
13 return Ok(None);
14 }
15 let raw = entity
16 .schema
17 .normalize_columns
18 .as_ref()
19 .and_then(|config| config.strategy.as_deref())
20 .unwrap_or("snake_case");
21 let normalized = normalize_strategy_name(raw);
22 match normalized.as_str() {
23 "snakecase" | "lower" | "camelcase" | "none" => Ok(Some(normalized)),
24 _ => Err(Box::new(ConfigError(format!(
25 "unsupported normalize_columns.strategy: {raw}"
26 )))),
27 }
28}
29
30pub fn resolve_source_columns(
31 columns: &[config::ColumnConfig],
32 strategy: Option<&str>,
33 keep_sources: bool,
34) -> FloeResult<Vec<config::ColumnConfig>> {
35 let mut resolved = Vec::with_capacity(columns.len());
36 let mut seen = HashMap::new();
37 for column in columns {
38 let source_name = column.source_or_name();
39 let normalized_name = if let Some(strategy) = strategy {
40 normalize_name(source_name, strategy)
41 } else {
42 source_name.to_string()
43 };
44 if let Some(existing) = seen.insert(normalized_name.clone(), source_name.to_string()) {
45 return Err(Box::new(ConfigError(format!(
46 "column source collision: {} and {} -> {}",
47 existing, column.name, normalized_name
48 ))));
49 }
50 resolved.push(config::ColumnConfig {
51 name: normalized_name,
52 source: if keep_sources {
53 Some(source_name.to_string())
54 } else {
55 None
56 },
57 column_type: column.column_type.clone(),
58 nullable: column.nullable,
59 unique: column.unique,
60 width: column.width,
61 trim: column.trim,
62 });
63 }
64 Ok(resolved)
65}
66
67pub fn source_column_mapping(
68 columns: &[config::ColumnConfig],
69 strategy: Option<&str>,
70) -> FloeResult<HashMap<String, String>> {
71 let mut mapping = HashMap::new();
72 let mut seen = HashSet::new();
73 for column in columns {
74 let Some(source) = column.source.as_deref() else {
75 continue;
76 };
77 let normalized = if let Some(strategy) = strategy {
78 normalize_name(source, strategy)
79 } else {
80 source.to_string()
81 };
82 if !seen.insert(normalized.clone()) {
83 return Err(Box::new(ConfigError(format!(
84 "column source collision: duplicate source selector {}",
85 normalized
86 ))));
87 }
88 mapping.insert(normalized, source.to_string());
89 }
90 Ok(mapping)
91}
92
93pub fn output_column_mapping(
94 columns: &[config::ColumnConfig],
95 strategy: Option<&str>,
96) -> FloeResult<HashMap<String, String>> {
97 let mut mapping = HashMap::new();
98 let mut targets = HashMap::new();
99 for column in columns {
100 let source_name = column.source_or_name();
101 let normalized_source = if let Some(strategy) = strategy {
102 normalize_name(source_name, strategy)
103 } else {
104 source_name.to_string()
105 };
106 let target_name = if column.source.is_some() {
107 column.name.clone()
108 } else if let Some(strategy) = strategy {
109 normalize_name(&column.name, strategy)
110 } else {
111 column.name.clone()
112 };
113 if let Some(existing) = targets.insert(target_name.clone(), normalized_source.clone()) {
114 return Err(Box::new(ConfigError(format!(
115 "output column name collision: {} and {} -> {}",
116 existing, normalized_source, target_name
117 ))));
118 }
119 if normalized_source != target_name {
120 mapping.insert(normalized_source, target_name);
121 }
122 }
123 Ok(mapping)
124}
125
126pub fn resolve_output_columns(
127 columns: &[config::ColumnConfig],
128 strategy: Option<&str>,
129) -> Vec<config::ColumnConfig> {
130 columns
131 .iter()
132 .map(|column| {
133 let name = if column.source.is_some() {
134 column.name.clone()
135 } else if let Some(strategy) = strategy {
136 normalize_name(&column.name, strategy)
137 } else {
138 column.name.clone()
139 };
140 config::ColumnConfig {
141 name,
142 source: None,
143 column_type: column.column_type.clone(),
144 nullable: column.nullable,
145 unique: column.unique,
146 width: column.width,
147 trim: column.trim,
148 }
149 })
150 .collect()
151}
152
153pub fn rename_output_columns(
154 df: &mut DataFrame,
155 mapping: &HashMap<String, String>,
156) -> FloeResult<()> {
157 if mapping.is_empty() {
158 return Ok(());
159 }
160 let names = df
161 .get_column_names()
162 .iter()
163 .map(|name| name.to_string())
164 .collect::<Vec<_>>();
165 let mut renamed = Vec::with_capacity(names.len());
166 for name in &names {
167 if let Some(target) = mapping.get(name) {
168 renamed.push(target.clone());
169 } else {
170 renamed.push(name.clone());
171 }
172 }
173 df.set_column_names(renamed.iter()).map_err(|err| {
174 Box::new(ConfigError(format!(
175 "failed to rename output columns: {err}"
176 )))
177 })?;
178 Ok(())
179}
180
181pub fn normalize_dataframe_columns(df: &mut DataFrame, strategy: &str) -> FloeResult<()> {
182 let names = df.get_column_names();
183 let mut normalized_names = Vec::with_capacity(names.len());
184 let mut seen = HashMap::new();
185 for name in names {
186 let normalized = normalize_name(name, strategy);
187 if let Some(existing) = seen.insert(normalized.clone(), name.to_string()) {
188 return Err(Box::new(ConfigError(format!(
189 "normalized input column collision: {} and {} -> {}",
190 existing, name, normalized
191 ))));
192 }
193 normalized_names.push(normalized);
194 }
195 df.set_column_names(normalized_names.iter())
196 .map_err(|err| {
197 Box::new(ConfigError(format!(
198 "failed to normalize column names: {err}"
199 )))
200 })?;
201 Ok(())
202}
203
204fn normalize_strategy_name(value: &str) -> String {
205 value.to_ascii_lowercase().replace(['-', '_'], "")
206}
207
208pub fn normalize_name(value: &str, strategy: &str) -> String {
209 match normalize_strategy_name(strategy).as_str() {
210 "snakecase" => to_snake_case(value),
211 "lower" => value.to_ascii_lowercase(),
212 "camelcase" => to_camel_case(value),
213 "none" => value.to_string(),
214 _ => value.to_string(),
215 }
216}
217
218fn to_snake_case(value: &str) -> String {
219 split_words(value).join("_")
220}
221
222fn to_camel_case(value: &str) -> String {
223 let words = split_words(value);
224 if words.is_empty() {
225 return String::new();
226 }
227 let mut out = String::new();
228 out.push_str(&words[0]);
229 for word in words.iter().skip(1) {
230 out.push_str(&capitalize(word));
231 }
232 out
233}
234
235fn split_words(value: &str) -> Vec<String> {
236 let chars: Vec<char> = value.chars().collect();
237 let mut words = Vec::new();
238 let mut current = String::new();
239 for (idx, ch) in chars.iter().copied().enumerate() {
240 if !ch.is_ascii_alphanumeric() {
241 if !current.is_empty() {
242 words.push(current);
243 current = String::new();
244 }
245 continue;
246 }
247
248 let is_upper = ch.is_ascii_uppercase();
249 let prev = if idx > 0 { Some(chars[idx - 1]) } else { None };
250 let next = chars.get(idx + 1).copied();
251 let prev_is_lower = prev.map(|c| c.is_ascii_lowercase()).unwrap_or(false);
252 let prev_is_digit = prev.map(|c| c.is_ascii_digit()).unwrap_or(false);
253 let prev_is_upper = prev.map(|c| c.is_ascii_uppercase()).unwrap_or(false);
254 let next_is_lower = next.map(|c| c.is_ascii_lowercase()).unwrap_or(false);
255
256 if !current.is_empty()
257 && is_upper
258 && ((prev_is_lower || prev_is_digit) || (prev_is_upper && next_is_lower))
259 {
260 words.push(current);
261 current = String::new();
262 }
263
264 current.push(ch.to_ascii_lowercase());
265 }
266
267 if !current.is_empty() {
268 words.push(current);
269 }
270
271 words
272}
273
274fn capitalize(value: &str) -> String {
275 let mut chars = value.chars();
276 match chars.next() {
277 Some(first) => first.to_ascii_uppercase().to_string() + chars.as_str(),
278 None => String::new(),
279 }
280}