Skip to main content

floe_core/checks/
normalize.rs

1use std::collections::{HashMap, HashSet};
2
3use polars::prelude::DataFrame;
4
5use crate::{config, ConfigError, FloeResult};
6
7pub fn resolve_normalize_strategy(entity: &config::EntityConfig) -> FloeResult<Option<String>> {
8    let normalize = match &entity.schema.normalize_columns {
9        Some(config) => config.enabled.unwrap_or(false),
10        None => false,
11    };
12    if !normalize {
13        return Ok(None);
14    }
15    let raw = entity
16        .schema
17        .normalize_columns
18        .as_ref()
19        .and_then(|config| config.strategy.as_deref())
20        .unwrap_or("snake_case");
21    let normalized = normalize_strategy_name(raw);
22    match normalized.as_str() {
23        "snakecase" | "lower" | "camelcase" | "none" => Ok(Some(normalized)),
24        _ => Err(Box::new(ConfigError(format!(
25            "unsupported normalize_columns.strategy: {raw}"
26        )))),
27    }
28}
29
30pub fn resolve_source_columns(
31    columns: &[config::ColumnConfig],
32    strategy: Option<&str>,
33    keep_sources: bool,
34) -> FloeResult<Vec<config::ColumnConfig>> {
35    let mut resolved = Vec::with_capacity(columns.len());
36    let mut seen = HashMap::new();
37    for column in columns {
38        let source_name = column.source_or_name();
39        let normalized_name = if let Some(strategy) = strategy {
40            normalize_name(source_name, strategy)
41        } else {
42            source_name.to_string()
43        };
44        if let Some(existing) = seen.insert(normalized_name.clone(), source_name.to_string()) {
45            return Err(Box::new(ConfigError(format!(
46                "column source collision: {} and {} -> {}",
47                existing, column.name, normalized_name
48            ))));
49        }
50        resolved.push(config::ColumnConfig {
51            name: normalized_name,
52            source: if keep_sources {
53                Some(source_name.to_string())
54            } else {
55                None
56            },
57            column_type: column.column_type.clone(),
58            nullable: column.nullable,
59            unique: column.unique,
60            width: column.width,
61            trim: column.trim,
62        });
63    }
64    Ok(resolved)
65}
66
67pub fn source_column_mapping(
68    columns: &[config::ColumnConfig],
69    strategy: Option<&str>,
70) -> FloeResult<HashMap<String, String>> {
71    let mut mapping = HashMap::new();
72    let mut seen = HashSet::new();
73    for column in columns {
74        let Some(source) = column.source.as_deref() else {
75            continue;
76        };
77        let normalized = if let Some(strategy) = strategy {
78            normalize_name(source, strategy)
79        } else {
80            source.to_string()
81        };
82        if !seen.insert(normalized.clone()) {
83            return Err(Box::new(ConfigError(format!(
84                "column source collision: duplicate source selector {}",
85                normalized
86            ))));
87        }
88        mapping.insert(normalized, source.to_string());
89    }
90    Ok(mapping)
91}
92
93pub fn output_column_mapping(
94    columns: &[config::ColumnConfig],
95    strategy: Option<&str>,
96) -> FloeResult<HashMap<String, String>> {
97    let mut mapping = HashMap::new();
98    let mut targets = HashMap::new();
99    for column in columns {
100        let source_name = column.source_or_name();
101        let normalized_source = if let Some(strategy) = strategy {
102            normalize_name(source_name, strategy)
103        } else {
104            source_name.to_string()
105        };
106        let target_name = if column.source.is_some() {
107            column.name.clone()
108        } else if let Some(strategy) = strategy {
109            normalize_name(&column.name, strategy)
110        } else {
111            column.name.clone()
112        };
113        if let Some(existing) = targets.insert(target_name.clone(), normalized_source.clone()) {
114            return Err(Box::new(ConfigError(format!(
115                "output column name collision: {} and {} -> {}",
116                existing, normalized_source, target_name
117            ))));
118        }
119        if normalized_source != target_name {
120            mapping.insert(normalized_source, target_name);
121        }
122    }
123    Ok(mapping)
124}
125
126/// Maps schema column names (what PII config references) to their runtime
127/// column names (the post-rename names that exist in the DataFrame when
128/// apply_pii_masking runs). Only contains entries where the schema name
129/// differs from the runtime name, i.e. when normalization changes the name.
130pub fn pii_schema_to_runtime_mapping(
131    columns: &[config::ColumnConfig],
132    strategy: Option<&str>,
133) -> HashMap<String, String> {
134    let mut mapping = HashMap::new();
135    for column in columns {
136        let runtime_name = if column.source.is_some() {
137            column.name.clone()
138        } else if let Some(strategy) = strategy {
139            normalize_name(&column.name, strategy)
140        } else {
141            column.name.clone()
142        };
143        if column.name != runtime_name {
144            mapping.insert(column.name.clone(), runtime_name);
145        }
146    }
147    mapping
148}
149
150pub fn resolve_output_columns(
151    columns: &[config::ColumnConfig],
152    strategy: Option<&str>,
153) -> Vec<config::ColumnConfig> {
154    columns
155        .iter()
156        .map(|column| {
157            let name = if column.source.is_some() {
158                column.name.clone()
159            } else if let Some(strategy) = strategy {
160                normalize_name(&column.name, strategy)
161            } else {
162                column.name.clone()
163            };
164            config::ColumnConfig {
165                name,
166                source: None,
167                column_type: column.column_type.clone(),
168                nullable: column.nullable,
169                unique: column.unique,
170                width: column.width,
171                trim: column.trim,
172            }
173        })
174        .collect()
175}
176
177pub fn rename_output_columns(
178    df: &mut DataFrame,
179    mapping: &HashMap<String, String>,
180) -> FloeResult<()> {
181    if mapping.is_empty() {
182        return Ok(());
183    }
184    let names = df
185        .get_column_names()
186        .iter()
187        .map(|name| name.to_string())
188        .collect::<Vec<_>>();
189    let mut renamed = Vec::with_capacity(names.len());
190    for name in &names {
191        if let Some(target) = mapping.get(name) {
192            renamed.push(target.clone());
193        } else {
194            renamed.push(name.clone());
195        }
196    }
197    df.set_column_names(renamed.iter()).map_err(|err| {
198        Box::new(ConfigError(format!(
199            "failed to rename output columns: {err}"
200        )))
201    })?;
202    Ok(())
203}
204
205pub fn normalize_dataframe_columns(df: &mut DataFrame, strategy: &str) -> FloeResult<()> {
206    let names = df.get_column_names();
207    let mut normalized_names = Vec::with_capacity(names.len());
208    let mut seen = HashMap::new();
209    for name in names {
210        let normalized = normalize_name(name, strategy);
211        if let Some(existing) = seen.insert(normalized.clone(), name.to_string()) {
212            return Err(Box::new(ConfigError(format!(
213                "normalized input column collision: {} and {} -> {}",
214                existing, name, normalized
215            ))));
216        }
217        normalized_names.push(normalized);
218    }
219    df.set_column_names(normalized_names.iter())
220        .map_err(|err| {
221            Box::new(ConfigError(format!(
222                "failed to normalize column names: {err}"
223            )))
224        })?;
225    Ok(())
226}
227
228fn normalize_strategy_name(value: &str) -> String {
229    value.to_ascii_lowercase().replace(['-', '_'], "")
230}
231
232pub fn normalize_name(value: &str, strategy: &str) -> String {
233    match normalize_strategy_name(strategy).as_str() {
234        "snakecase" => to_snake_case(value),
235        "lower" => value.to_ascii_lowercase(),
236        "camelcase" => to_camel_case(value),
237        "none" => value.to_string(),
238        _ => value.to_string(),
239    }
240}
241
242fn to_snake_case(value: &str) -> String {
243    split_words(value).join("_")
244}
245
246fn to_camel_case(value: &str) -> String {
247    let words = split_words(value);
248    if words.is_empty() {
249        return String::new();
250    }
251    let mut out = String::new();
252    out.push_str(&words[0]);
253    for word in words.iter().skip(1) {
254        out.push_str(&capitalize(word));
255    }
256    out
257}
258
259fn split_words(value: &str) -> Vec<String> {
260    let chars: Vec<char> = value.chars().collect();
261    let mut words = Vec::new();
262    let mut current = String::new();
263    for (idx, ch) in chars.iter().copied().enumerate() {
264        if !ch.is_ascii_alphanumeric() {
265            if !current.is_empty() {
266                words.push(current);
267                current = String::new();
268            }
269            continue;
270        }
271
272        let is_upper = ch.is_ascii_uppercase();
273        let prev = if idx > 0 { Some(chars[idx - 1]) } else { None };
274        let next = chars.get(idx + 1).copied();
275        let prev_is_lower = prev.map(|c| c.is_ascii_lowercase()).unwrap_or(false);
276        let prev_is_digit = prev.map(|c| c.is_ascii_digit()).unwrap_or(false);
277        let prev_is_upper = prev.map(|c| c.is_ascii_uppercase()).unwrap_or(false);
278        let next_is_lower = next.map(|c| c.is_ascii_lowercase()).unwrap_or(false);
279
280        if !current.is_empty()
281            && is_upper
282            && ((prev_is_lower || prev_is_digit) || (prev_is_upper && next_is_lower))
283        {
284            words.push(current);
285            current = String::new();
286        }
287
288        current.push(ch.to_ascii_lowercase());
289    }
290
291    if !current.is_empty() {
292        words.push(current);
293    }
294
295    words
296}
297
298fn capitalize(value: &str) -> String {
299    let mut chars = value.chars();
300    match chars.next() {
301        Some(first) => first.to_ascii_uppercase().to_string() + chars.as_str(),
302        None => String::new(),
303    }
304}