1use polars::prelude::{AnyValue, DataFrame, Series};
2use std::collections::{BTreeMap, HashMap, HashSet};
3
4use super::{ColumnIndex, RowError, SparseRowErrors};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8const UNIQUE_SAMPLE_LIMIT: usize = 5;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11enum UniqueKey {
12 Bool(bool),
13 I64(i64),
14 U64(u64),
15 F64(u64),
16 String(String),
17 Other(String),
18}
19
20impl UniqueKey {
21 fn as_string(&self) -> String {
22 match self {
23 UniqueKey::Bool(value) => value.to_string(),
24 UniqueKey::I64(value) => value.to_string(),
25 UniqueKey::U64(value) => value.to_string(),
26 UniqueKey::F64(value) => f64::from_bits(*value).to_string(),
27 UniqueKey::String(value) | UniqueKey::Other(value) => value.clone(),
28 }
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33struct CompositeKey(Vec<UniqueKey>);
34
35#[derive(Debug, Clone)]
36pub struct UniqueConstraint {
37 pub runtime_columns: Vec<String>,
38 pub report_columns: Vec<String>,
39 pub enforce_reject: bool,
40}
41
42#[derive(Debug, Clone)]
43pub struct UniqueConstraintSample {
44 pub values: BTreeMap<String, String>,
45 pub count: u64,
46}
47
48#[derive(Debug, Clone)]
49pub struct UniqueConstraintResult {
50 pub columns: Vec<String>,
51 pub duplicates_count: u64,
52 pub affected_rows_count: u64,
53 pub samples: Vec<UniqueConstraintSample>,
54}
55
56#[derive(Debug, Clone)]
57struct ConstraintState {
58 constraint: UniqueConstraint,
59 seen: HashSet<CompositeKey>,
60 duplicates_count: u64,
61 sample_counts: HashMap<CompositeKey, u64>,
62}
63
64#[derive(Debug, Default)]
65pub struct UniqueTracker {
66 states: Vec<ConstraintState>,
67}
68
69impl UniqueTracker {
70 pub fn new(columns: &[config::ColumnConfig]) -> Self {
71 let constraints = legacy_unique_constraints(columns)
72 .into_iter()
73 .map(|column| UniqueConstraint {
74 runtime_columns: vec![column.clone()],
75 report_columns: vec![column],
76 enforce_reject: false,
77 })
78 .collect::<Vec<_>>();
79 Self::with_constraints(constraints)
80 }
81
82 pub fn with_constraints(constraints: Vec<UniqueConstraint>) -> Self {
83 let states = constraints
84 .into_iter()
85 .map(|constraint| ConstraintState {
86 constraint,
87 seen: HashSet::new(),
88 duplicates_count: 0,
89 sample_counts: HashMap::new(),
90 })
91 .collect();
92 Self { states }
93 }
94
95 pub fn is_empty(&self) -> bool {
96 self.states.is_empty()
97 }
98
99 pub fn runtime_columns(&self) -> Vec<String> {
100 let mut seen = HashSet::new();
101 let mut columns = Vec::new();
102 for state in &self.states {
103 for column in &state.constraint.runtime_columns {
104 if seen.insert(column.clone()) {
105 columns.push(column.clone());
106 }
107 }
108 }
109 columns
110 }
111
112 pub fn seed_from_df(&mut self, df: &DataFrame) -> FloeResult<()> {
113 if df.height() == 0 || self.states.is_empty() {
114 return Ok(());
115 }
116 for state in &mut self.states {
117 let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
118 for row_idx in 0..df.height() {
119 let key = match composite_key_from_row(&columns, row_idx)? {
120 Some(key) => key,
121 None => continue,
122 };
123 state.seen.insert(key);
124 }
125 }
126 Ok(())
127 }
128
129 pub fn apply(
130 &mut self,
131 df: &DataFrame,
132 columns: &[config::ColumnConfig],
133 ) -> FloeResult<Vec<Vec<RowError>>> {
134 let mut errors_per_row = vec![Vec::new(); df.height()];
135 let sparse = self.apply_sparse(df, columns)?;
136 for (row_idx, row_errors) in sparse.iter() {
137 if let Some(slot) = errors_per_row.get_mut(*row_idx) {
138 slot.extend(row_errors.clone());
139 }
140 }
141 Ok(errors_per_row)
142 }
143
144 pub fn apply_sparse(
145 &mut self,
146 df: &DataFrame,
147 _columns: &[config::ColumnConfig],
148 ) -> FloeResult<SparseRowErrors> {
149 let mut forced_reject_rows = HashSet::new();
150 self.apply_sparse_with_forced_rejects(df, _columns, &mut forced_reject_rows)
151 }
152
153 pub fn apply_sparse_with_forced_rejects(
154 &mut self,
155 df: &DataFrame,
156 _columns: &[config::ColumnConfig],
157 forced_reject_rows: &mut HashSet<usize>,
158 ) -> FloeResult<SparseRowErrors> {
159 let mut errors = SparseRowErrors::new(df.height());
160 if df.height() == 0 || self.states.is_empty() {
161 return Ok(errors);
162 }
163
164 for state in &mut self.states {
165 let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
166 let report_columns = state.constraint.report_columns.clone();
167 let (constraint_repr, message) = if report_columns.len() == 1 {
168 (report_columns[0].clone(), "duplicate value")
169 } else {
170 (format!("[{}]", report_columns.join(",")), "duplicate key")
171 };
172 for row_idx in 0..df.height() {
173 let key = match composite_key_from_row(&columns, row_idx)? {
174 Some(key) => key,
175 None => continue,
176 };
177 if state.seen.contains(&key) {
178 errors.add_error(row_idx, RowError::new("unique", &constraint_repr, message));
179 if state.constraint.enforce_reject {
180 forced_reject_rows.insert(row_idx);
181 }
182 state.duplicates_count += 1;
183 let counter = state.sample_counts.entry(key).or_insert(0);
184 *counter += 1;
185 } else {
186 state.seen.insert(key);
187 }
188 }
189 }
190
191 Ok(errors)
192 }
193
194 pub fn results(&self) -> Vec<UniqueConstraintResult> {
195 self.states
196 .iter()
197 .map(|state| {
198 let mut sample_counts = state
199 .sample_counts
200 .iter()
201 .map(|(key, count)| (key, *count))
202 .collect::<Vec<_>>();
203 sample_counts.sort_by(|left, right| {
204 right
205 .1
206 .cmp(&left.1)
207 .then_with(|| format!("{:?}", left.0).cmp(&format!("{:?}", right.0)))
208 });
209 let samples = sample_counts
210 .into_iter()
211 .take(UNIQUE_SAMPLE_LIMIT)
212 .map(|(key, count)| {
213 let mut values = BTreeMap::new();
214 for (idx, value) in key.0.iter().enumerate() {
215 if let Some(column_name) = state.constraint.report_columns.get(idx) {
216 values.insert(column_name.clone(), value.as_string());
217 }
218 }
219 UniqueConstraintSample { values, count }
220 })
221 .collect::<Vec<_>>();
222 UniqueConstraintResult {
223 columns: state.constraint.report_columns.clone(),
224 duplicates_count: state.duplicates_count,
225 affected_rows_count: state.duplicates_count,
226 samples,
227 }
228 })
229 .collect()
230 }
231}
232
233pub fn unique_errors(
234 df: &DataFrame,
235 columns: &[config::ColumnConfig],
236 _indices: &ColumnIndex,
237) -> FloeResult<Vec<Vec<RowError>>> {
238 let mut tracker = UniqueTracker::new(columns);
239 tracker.apply(df, columns)
240}
241
242pub fn unique_errors_sparse(
243 df: &DataFrame,
244 columns: &[config::ColumnConfig],
245 _indices: &ColumnIndex,
246) -> FloeResult<SparseRowErrors> {
247 let mut tracker = UniqueTracker::new(columns);
248 tracker.apply_sparse(df, columns)
249}
250
251pub fn unique_counts(
252 df: &DataFrame,
253 columns: &[config::ColumnConfig],
254) -> FloeResult<Vec<(String, u64)>> {
255 if df.height() == 0 {
256 return Ok(Vec::new());
257 }
258
259 let unique_columns: Vec<&config::ColumnConfig> = columns
260 .iter()
261 .filter(|col| col.unique == Some(true))
262 .collect();
263 if unique_columns.is_empty() {
264 return Ok(Vec::new());
265 }
266
267 let mut counts = Vec::new();
268 for column in unique_columns {
269 let series = df.column(&column.name).map_err(|err| {
270 Box::new(RunError(format!(
271 "unique column {} not found: {err}",
272 column.name
273 )))
274 })?;
275 let non_null = series.len().saturating_sub(series.null_count());
276 if non_null == 0 {
277 continue;
278 }
279 let unique = series.drop_nulls().n_unique().map_err(|err| {
280 Box::new(RunError(format!(
281 "unique column {} read failed: {err}",
282 column.name
283 )))
284 })?;
285 let violations = non_null.saturating_sub(unique) as u64;
286 if violations > 0 {
287 counts.push((column.name.clone(), violations));
288 }
289 }
290
291 Ok(counts)
292}
293
294pub fn resolve_schema_unique_keys(schema: &config::SchemaConfig) -> Vec<Vec<String>> {
295 let mut seen = HashSet::new();
296 let mut constraints = Vec::new();
297
298 if let Some(unique_keys) = schema.unique_keys.as_ref() {
299 for key in unique_keys {
300 let normalized = key
301 .iter()
302 .map(|column| column.trim().to_string())
303 .collect::<Vec<_>>();
304 if normalized.is_empty() {
305 continue;
306 }
307 let signature = normalized.join("\u{1f}");
308 if seen.insert(signature) {
309 constraints.push(normalized);
310 }
311 }
312 } else {
313 for column in legacy_unique_constraints(&schema.columns) {
314 let constraint = vec![column];
315 let signature = constraint.join("\u{1f}");
316 if seen.insert(signature) {
317 constraints.push(constraint);
318 }
319 }
320 }
321
322 if let Some(primary_key) = schema.primary_key.as_ref() {
323 let normalized = primary_key
324 .iter()
325 .map(|column| column.trim().to_string())
326 .collect::<Vec<_>>();
327 if !normalized.is_empty() {
328 let signature = normalized.join("\u{1f}");
329 if seen.insert(signature) {
330 constraints.push(normalized);
331 }
332 }
333 }
334
335 constraints
336}
337
338fn legacy_unique_constraints(columns: &[config::ColumnConfig]) -> Vec<String> {
339 columns
340 .iter()
341 .filter(|col| col.unique == Some(true))
342 .map(|col| col.name.trim().to_string())
343 .filter(|name| !name.is_empty())
344 .collect()
345}
346
347fn load_constraint_columns(df: &DataFrame, columns: &[String]) -> FloeResult<Vec<Series>> {
348 let mut output = Vec::with_capacity(columns.len());
349 for column in columns {
350 let series = df.column(column).map_err(|err| {
351 Box::new(RunError(format!(
352 "unique constraint column {} not found: {err}",
353 column
354 )))
355 })?;
356 output.push(series.as_materialized_series().rechunk());
357 }
358 Ok(output)
359}
360
361fn composite_key_from_row(columns: &[Series], row_idx: usize) -> FloeResult<Option<CompositeKey>> {
362 let mut key = Vec::with_capacity(columns.len());
363 for series in columns {
364 let value = series.get(row_idx).map_err(|err| {
365 Box::new(RunError(format!(
366 "unique constraint read failed at row {}: {err}",
367 row_idx
368 )))
369 })?;
370 let Some(value) = unique_key(value) else {
371 return Ok(None);
372 };
373 key.push(value);
374 }
375 Ok(Some(CompositeKey(key)))
376}
377
378fn unique_key(value: AnyValue) -> Option<UniqueKey> {
379 match value {
380 AnyValue::Null => None,
381 AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
382 AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
383 AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
384 AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
385 AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
386 AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
387 AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
388 AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
389 AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
390 AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
391 AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
392 AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
393 AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
394 AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
395 AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
396 other => Some(UniqueKey::Other(other.to_string())),
397 }
398}