Skip to main content

floe_core/checks/
normalize.rs

1use std::collections::{HashMap, HashSet};
2
3use polars::prelude::DataFrame;
4
5use crate::{config, ConfigError, FloeResult};
6
7pub fn resolve_normalize_strategy(entity: &config::EntityConfig) -> FloeResult<Option<String>> {
8    let normalize = match &entity.schema.normalize_columns {
9        Some(config) => config.enabled.unwrap_or(false),
10        None => false,
11    };
12    if !normalize {
13        return Ok(None);
14    }
15    let raw = entity
16        .schema
17        .normalize_columns
18        .as_ref()
19        .and_then(|config| config.strategy.as_deref())
20        .unwrap_or("snake_case");
21    let normalized = normalize_strategy_name(raw);
22    match normalized.as_str() {
23        "snakecase" | "lower" | "camelcase" | "none" => Ok(Some(normalized)),
24        _ => Err(Box::new(ConfigError(format!(
25            "unsupported normalize_columns.strategy: {raw}"
26        )))),
27    }
28}
29
30pub fn resolve_source_columns(
31    columns: &[config::ColumnConfig],
32    strategy: Option<&str>,
33    keep_sources: bool,
34) -> FloeResult<Vec<config::ColumnConfig>> {
35    let mut resolved = Vec::with_capacity(columns.len());
36    let mut seen = HashMap::new();
37    for column in columns {
38        let source_name = column.source_or_name();
39        let normalized_name = if let Some(strategy) = strategy {
40            normalize_name(source_name, strategy)
41        } else {
42            source_name.to_string()
43        };
44        if let Some(existing) = seen.insert(normalized_name.clone(), source_name.to_string()) {
45            return Err(Box::new(ConfigError(format!(
46                "column source collision: {} and {} -> {}",
47                existing, column.name, normalized_name
48            ))));
49        }
50        resolved.push(config::ColumnConfig {
51            name: normalized_name,
52            source: if keep_sources {
53                Some(source_name.to_string())
54            } else {
55                None
56            },
57            column_type: column.column_type.clone(),
58            nullable: column.nullable,
59            unique: column.unique,
60            width: column.width,
61            trim: column.trim,
62        });
63    }
64    Ok(resolved)
65}
66
67pub fn source_column_mapping(
68    columns: &[config::ColumnConfig],
69    strategy: Option<&str>,
70) -> FloeResult<HashMap<String, String>> {
71    let mut mapping = HashMap::new();
72    let mut seen = HashSet::new();
73    for column in columns {
74        let Some(source) = column.source.as_deref() else {
75            continue;
76        };
77        let normalized = if let Some(strategy) = strategy {
78            normalize_name(source, strategy)
79        } else {
80            source.to_string()
81        };
82        if !seen.insert(normalized.clone()) {
83            return Err(Box::new(ConfigError(format!(
84                "column source collision: duplicate source selector {}",
85                normalized
86            ))));
87        }
88        mapping.insert(normalized, source.to_string());
89    }
90    Ok(mapping)
91}
92
93pub fn output_column_mapping(
94    columns: &[config::ColumnConfig],
95    strategy: Option<&str>,
96) -> FloeResult<HashMap<String, String>> {
97    let mut mapping = HashMap::new();
98    let mut targets = HashMap::new();
99    for column in columns {
100        let source_name = column.source_or_name();
101        let normalized_source = if let Some(strategy) = strategy {
102            normalize_name(source_name, strategy)
103        } else {
104            source_name.to_string()
105        };
106        let target_name = if column.source.is_some() {
107            column.name.clone()
108        } else if let Some(strategy) = strategy {
109            normalize_name(&column.name, strategy)
110        } else {
111            column.name.clone()
112        };
113        if let Some(existing) = targets.insert(target_name.clone(), normalized_source.clone()) {
114            return Err(Box::new(ConfigError(format!(
115                "output column name collision: {} and {} -> {}",
116                existing, normalized_source, target_name
117            ))));
118        }
119        if normalized_source != target_name {
120            mapping.insert(normalized_source, target_name);
121        }
122    }
123    Ok(mapping)
124}
125
126pub fn resolve_output_columns(
127    columns: &[config::ColumnConfig],
128    strategy: Option<&str>,
129) -> Vec<config::ColumnConfig> {
130    columns
131        .iter()
132        .map(|column| {
133            let name = if column.source.is_some() {
134                column.name.clone()
135            } else if let Some(strategy) = strategy {
136                normalize_name(&column.name, strategy)
137            } else {
138                column.name.clone()
139            };
140            config::ColumnConfig {
141                name,
142                source: None,
143                column_type: column.column_type.clone(),
144                nullable: column.nullable,
145                unique: column.unique,
146                width: column.width,
147                trim: column.trim,
148            }
149        })
150        .collect()
151}
152
153pub fn rename_output_columns(
154    df: &mut DataFrame,
155    mapping: &HashMap<String, String>,
156) -> FloeResult<()> {
157    if mapping.is_empty() {
158        return Ok(());
159    }
160    let names = df
161        .get_column_names()
162        .iter()
163        .map(|name| name.to_string())
164        .collect::<Vec<_>>();
165    let mut renamed = Vec::with_capacity(names.len());
166    for name in &names {
167        if let Some(target) = mapping.get(name) {
168            renamed.push(target.clone());
169        } else {
170            renamed.push(name.clone());
171        }
172    }
173    df.set_column_names(renamed.iter()).map_err(|err| {
174        Box::new(ConfigError(format!(
175            "failed to rename output columns: {err}"
176        )))
177    })?;
178    Ok(())
179}
180
181pub fn normalize_dataframe_columns(df: &mut DataFrame, strategy: &str) -> FloeResult<()> {
182    let names = df.get_column_names();
183    let mut normalized_names = Vec::with_capacity(names.len());
184    let mut seen = HashMap::new();
185    for name in names {
186        let normalized = normalize_name(name, strategy);
187        if let Some(existing) = seen.insert(normalized.clone(), name.to_string()) {
188            return Err(Box::new(ConfigError(format!(
189                "normalized input column collision: {} and {} -> {}",
190                existing, name, normalized
191            ))));
192        }
193        normalized_names.push(normalized);
194    }
195    df.set_column_names(normalized_names.iter())
196        .map_err(|err| {
197            Box::new(ConfigError(format!(
198                "failed to normalize column names: {err}"
199            )))
200        })?;
201    Ok(())
202}
203
204fn normalize_strategy_name(value: &str) -> String {
205    value.to_ascii_lowercase().replace(['-', '_'], "")
206}
207
208pub fn normalize_name(value: &str, strategy: &str) -> String {
209    match normalize_strategy_name(strategy).as_str() {
210        "snakecase" => to_snake_case(value),
211        "lower" => value.to_ascii_lowercase(),
212        "camelcase" => to_camel_case(value),
213        "none" => value.to_string(),
214        _ => value.to_string(),
215    }
216}
217
218fn to_snake_case(value: &str) -> String {
219    split_words(value).join("_")
220}
221
222fn to_camel_case(value: &str) -> String {
223    let words = split_words(value);
224    if words.is_empty() {
225        return String::new();
226    }
227    let mut out = String::new();
228    out.push_str(&words[0]);
229    for word in words.iter().skip(1) {
230        out.push_str(&capitalize(word));
231    }
232    out
233}
234
235fn split_words(value: &str) -> Vec<String> {
236    let chars: Vec<char> = value.chars().collect();
237    let mut words = Vec::new();
238    let mut current = String::new();
239    for (idx, ch) in chars.iter().copied().enumerate() {
240        if !ch.is_ascii_alphanumeric() {
241            if !current.is_empty() {
242                words.push(current);
243                current = String::new();
244            }
245            continue;
246        }
247
248        let is_upper = ch.is_ascii_uppercase();
249        let prev = if idx > 0 { Some(chars[idx - 1]) } else { None };
250        let next = chars.get(idx + 1).copied();
251        let prev_is_lower = prev.map(|c| c.is_ascii_lowercase()).unwrap_or(false);
252        let prev_is_digit = prev.map(|c| c.is_ascii_digit()).unwrap_or(false);
253        let prev_is_upper = prev.map(|c| c.is_ascii_uppercase()).unwrap_or(false);
254        let next_is_lower = next.map(|c| c.is_ascii_lowercase()).unwrap_or(false);
255
256        if !current.is_empty()
257            && is_upper
258            && ((prev_is_lower || prev_is_digit) || (prev_is_upper && next_is_lower))
259        {
260            words.push(current);
261            current = String::new();
262        }
263
264        current.push(ch.to_ascii_lowercase());
265    }
266
267    if !current.is_empty() {
268        words.push(current);
269    }
270
271    words
272}
273
274fn capitalize(value: &str) -> String {
275    let mut chars = value.chars();
276    match chars.next() {
277        Some(first) => first.to_ascii_uppercase().to_string() + chars.as_str(),
278        None => String::new(),
279    }
280}