1use polars::prelude::{AnyValue, DataFrame, Series};
2use std::collections::{BTreeMap, HashMap, HashSet};
3
4use super::{ColumnIndex, RowError, SparseRowErrors};
5use crate::errors::RunError;
6use crate::{config, FloeResult};
7
8const UNIQUE_SAMPLE_LIMIT: usize = 5;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11enum UniqueKey {
12 Bool(bool),
13 I64(i64),
14 U64(u64),
15 F64(u64),
16 String(String),
17 Other(String),
18}
19
20impl UniqueKey {
21 fn as_string(&self) -> String {
22 match self {
23 UniqueKey::Bool(value) => value.to_string(),
24 UniqueKey::I64(value) => value.to_string(),
25 UniqueKey::U64(value) => value.to_string(),
26 UniqueKey::F64(value) => f64::from_bits(*value).to_string(),
27 UniqueKey::String(value) | UniqueKey::Other(value) => value.clone(),
28 }
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33struct CompositeKey(Vec<UniqueKey>);
34
35#[derive(Debug, Clone)]
36pub struct UniqueConstraint {
37 pub runtime_columns: Vec<String>,
38 pub report_columns: Vec<String>,
39 pub enforce_reject: bool,
40}
41
42#[derive(Debug, Clone)]
43pub struct UniqueConstraintSample {
44 pub values: BTreeMap<String, String>,
45 pub count: u64,
46}
47
48#[derive(Debug, Clone)]
49pub struct UniqueConstraintResult {
50 pub columns: Vec<String>,
51 pub duplicates_count: u64,
52 pub batch_duplicates_count: u64,
53 pub target_duplicates_count: u64,
54 pub affected_rows_count: u64,
55 pub samples: Vec<UniqueConstraintSample>,
56}
57
58#[derive(Debug, Clone)]
59struct ConstraintState {
60 constraint: UniqueConstraint,
61 seen: HashSet<CompositeKey>,
62 seeded_keys: HashSet<CompositeKey>,
63 duplicates_count: u64,
64 batch_duplicates_count: u64,
65 target_duplicates_count: u64,
66 sample_counts: HashMap<CompositeKey, u64>,
67}
68
69#[derive(Debug, Default)]
70pub struct UniqueTracker {
71 states: Vec<ConstraintState>,
72}
73
74impl UniqueTracker {
75 pub fn new(columns: &[config::ColumnConfig]) -> Self {
76 let constraints = legacy_unique_constraints(columns)
77 .into_iter()
78 .map(|column| UniqueConstraint {
79 runtime_columns: vec![column.clone()],
80 report_columns: vec![column],
81 enforce_reject: false,
82 })
83 .collect::<Vec<_>>();
84 Self::with_constraints(constraints)
85 }
86
87 pub fn with_constraints(constraints: Vec<UniqueConstraint>) -> Self {
88 let states = constraints
89 .into_iter()
90 .map(|constraint| ConstraintState {
91 constraint,
92 seen: HashSet::new(),
93 seeded_keys: HashSet::new(),
94 duplicates_count: 0,
95 batch_duplicates_count: 0,
96 target_duplicates_count: 0,
97 sample_counts: HashMap::new(),
98 })
99 .collect();
100 Self { states }
101 }
102
103 pub fn is_empty(&self) -> bool {
104 self.states.is_empty()
105 }
106
107 pub fn runtime_columns(&self) -> Vec<String> {
108 let mut seen = HashSet::new();
109 let mut columns = Vec::new();
110 for state in &self.states {
111 for column in &state.constraint.runtime_columns {
112 if seen.insert(column.clone()) {
113 columns.push(column.clone());
114 }
115 }
116 }
117 columns
118 }
119
120 pub fn seed_from_df(&mut self, df: &DataFrame) -> FloeResult<()> {
121 if df.height() == 0 || self.states.is_empty() {
122 return Ok(());
123 }
124 for state in &mut self.states {
125 let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
126 for row_idx in 0..df.height() {
127 let key = match composite_key_from_row(&columns, row_idx)? {
128 Some(key) => key,
129 None => continue,
130 };
131 state.seeded_keys.insert(key.clone());
132 state.seen.insert(key);
133 }
134 }
135 Ok(())
136 }
137
138 pub fn apply(
139 &mut self,
140 df: &DataFrame,
141 columns: &[config::ColumnConfig],
142 ) -> FloeResult<Vec<Vec<RowError>>> {
143 let mut errors_per_row = vec![Vec::new(); df.height()];
144 let sparse = self.apply_sparse(df, columns)?;
145 for (row_idx, row_errors) in sparse.iter() {
146 if let Some(slot) = errors_per_row.get_mut(*row_idx) {
147 slot.extend(row_errors.clone());
148 }
149 }
150 Ok(errors_per_row)
151 }
152
153 pub fn apply_sparse(
154 &mut self,
155 df: &DataFrame,
156 _columns: &[config::ColumnConfig],
157 ) -> FloeResult<SparseRowErrors> {
158 let mut forced_reject_rows = HashSet::new();
159 self.apply_sparse_with_forced_rejects(df, _columns, &mut forced_reject_rows)
160 }
161
162 pub fn apply_sparse_with_forced_rejects(
163 &mut self,
164 df: &DataFrame,
165 _columns: &[config::ColumnConfig],
166 forced_reject_rows: &mut HashSet<usize>,
167 ) -> FloeResult<SparseRowErrors> {
168 let mut errors = SparseRowErrors::new(df.height());
169 if df.height() == 0 || self.states.is_empty() {
170 return Ok(errors);
171 }
172
173 for state in &mut self.states {
174 let columns = load_constraint_columns(df, &state.constraint.runtime_columns)?;
175 let report_columns = state.constraint.report_columns.clone();
176 let (constraint_repr, message) = if report_columns.len() == 1 {
177 (report_columns[0].clone(), "duplicate value")
178 } else {
179 (format!("[{}]", report_columns.join(",")), "duplicate key")
180 };
181 for row_idx in 0..df.height() {
182 let key = match composite_key_from_row(&columns, row_idx)? {
183 Some(key) => key,
184 None => continue,
185 };
186 if state.seen.contains(&key) {
187 errors.add_error(row_idx, RowError::new("unique", &constraint_repr, message));
188 if state.constraint.enforce_reject {
189 forced_reject_rows.insert(row_idx);
190 }
191 state.duplicates_count += 1;
192 if state.seeded_keys.contains(&key) {
193 state.target_duplicates_count += 1;
194 } else {
195 state.batch_duplicates_count += 1;
196 }
197 let counter = state.sample_counts.entry(key).or_insert(0);
198 *counter += 1;
199 } else {
200 state.seen.insert(key);
201 }
202 }
203 }
204
205 Ok(errors)
206 }
207
208 pub fn results(&self) -> Vec<UniqueConstraintResult> {
209 self.states
210 .iter()
211 .map(|state| {
212 let mut sample_counts = state
213 .sample_counts
214 .iter()
215 .map(|(key, count)| (key, *count))
216 .collect::<Vec<_>>();
217 sample_counts.sort_by(|left, right| {
218 right
219 .1
220 .cmp(&left.1)
221 .then_with(|| format!("{:?}", left.0).cmp(&format!("{:?}", right.0)))
222 });
223 let samples = sample_counts
224 .into_iter()
225 .take(UNIQUE_SAMPLE_LIMIT)
226 .map(|(key, count)| {
227 let mut values = BTreeMap::new();
228 for (idx, value) in key.0.iter().enumerate() {
229 if let Some(column_name) = state.constraint.report_columns.get(idx) {
230 values.insert(column_name.clone(), value.as_string());
231 }
232 }
233 UniqueConstraintSample { values, count }
234 })
235 .collect::<Vec<_>>();
236 UniqueConstraintResult {
237 columns: state.constraint.report_columns.clone(),
238 duplicates_count: state.duplicates_count,
239 batch_duplicates_count: state.batch_duplicates_count,
240 target_duplicates_count: state.target_duplicates_count,
241 affected_rows_count: state.duplicates_count,
242 samples,
243 }
244 })
245 .collect()
246 }
247}
248
249pub fn unique_errors(
250 df: &DataFrame,
251 columns: &[config::ColumnConfig],
252 _indices: &ColumnIndex,
253) -> FloeResult<Vec<Vec<RowError>>> {
254 let mut tracker = UniqueTracker::new(columns);
255 tracker.apply(df, columns)
256}
257
258pub fn unique_errors_sparse(
259 df: &DataFrame,
260 columns: &[config::ColumnConfig],
261 _indices: &ColumnIndex,
262) -> FloeResult<SparseRowErrors> {
263 let mut tracker = UniqueTracker::new(columns);
264 tracker.apply_sparse(df, columns)
265}
266
267pub fn unique_counts(
268 df: &DataFrame,
269 columns: &[config::ColumnConfig],
270) -> FloeResult<Vec<(String, u64)>> {
271 if df.height() == 0 {
272 return Ok(Vec::new());
273 }
274
275 let unique_columns: Vec<&config::ColumnConfig> = columns
276 .iter()
277 .filter(|col| col.unique == Some(true))
278 .collect();
279 if unique_columns.is_empty() {
280 return Ok(Vec::new());
281 }
282
283 let mut counts = Vec::new();
284 for column in unique_columns {
285 let series = df.column(&column.name).map_err(|err| {
286 Box::new(RunError(format!(
287 "unique column {} not found: {err}",
288 column.name
289 )))
290 })?;
291 let non_null = series.len().saturating_sub(series.null_count());
292 if non_null == 0 {
293 continue;
294 }
295 let unique = series.drop_nulls().n_unique().map_err(|err| {
296 Box::new(RunError(format!(
297 "unique column {} read failed: {err}",
298 column.name
299 )))
300 })?;
301 let violations = non_null.saturating_sub(unique) as u64;
302 if violations > 0 {
303 counts.push((column.name.clone(), violations));
304 }
305 }
306
307 Ok(counts)
308}
309
310pub fn resolve_schema_unique_keys(schema: &config::SchemaConfig) -> Vec<Vec<String>> {
311 let mut seen = HashSet::new();
312 let mut constraints = Vec::new();
313
314 if let Some(unique_keys) = schema.unique_keys.as_ref() {
315 for key in unique_keys {
316 let normalized = key
317 .iter()
318 .map(|column| column.trim().to_string())
319 .collect::<Vec<_>>();
320 if normalized.is_empty() {
321 continue;
322 }
323 let signature = normalized.join("\u{1f}");
324 if seen.insert(signature) {
325 constraints.push(normalized);
326 }
327 }
328 } else {
329 for column in legacy_unique_constraints(&schema.columns) {
330 let constraint = vec![column];
331 let signature = constraint.join("\u{1f}");
332 if seen.insert(signature) {
333 constraints.push(constraint);
334 }
335 }
336 }
337
338 if let Some(primary_key) = schema.primary_key.as_ref() {
339 let normalized = primary_key
340 .iter()
341 .map(|column| column.trim().to_string())
342 .collect::<Vec<_>>();
343 if !normalized.is_empty() {
344 let signature = normalized.join("\u{1f}");
345 if seen.insert(signature) {
346 constraints.push(normalized);
347 }
348 }
349 }
350
351 constraints
352}
353
354fn legacy_unique_constraints(columns: &[config::ColumnConfig]) -> Vec<String> {
355 columns
356 .iter()
357 .filter(|col| col.unique == Some(true))
358 .map(|col| col.name.trim().to_string())
359 .filter(|name| !name.is_empty())
360 .collect()
361}
362
363fn load_constraint_columns(df: &DataFrame, columns: &[String]) -> FloeResult<Vec<Series>> {
364 let mut output = Vec::with_capacity(columns.len());
365 for column in columns {
366 let series = df.column(column).map_err(|err| {
367 Box::new(RunError(format!(
368 "unique constraint column {} not found: {err}",
369 column
370 )))
371 })?;
372 output.push(series.as_materialized_series().rechunk());
373 }
374 Ok(output)
375}
376
377fn composite_key_from_row(columns: &[Series], row_idx: usize) -> FloeResult<Option<CompositeKey>> {
378 let mut key = Vec::with_capacity(columns.len());
379 for series in columns {
380 let value = series.get(row_idx).map_err(|err| {
381 Box::new(RunError(format!(
382 "unique constraint read failed at row {}: {err}",
383 row_idx
384 )))
385 })?;
386 let Some(value) = unique_key(value) else {
387 return Ok(None);
388 };
389 key.push(value);
390 }
391 Ok(Some(CompositeKey(key)))
392}
393
394fn unique_key(value: AnyValue) -> Option<UniqueKey> {
395 match value {
396 AnyValue::Null => None,
397 AnyValue::Boolean(value) => Some(UniqueKey::Bool(value)),
398 AnyValue::Int8(value) => Some(UniqueKey::I64(value as i64)),
399 AnyValue::Int16(value) => Some(UniqueKey::I64(value as i64)),
400 AnyValue::Int32(value) => Some(UniqueKey::I64(value as i64)),
401 AnyValue::Int64(value) => Some(UniqueKey::I64(value)),
402 AnyValue::Int128(value) => Some(UniqueKey::Other(value.to_string())),
403 AnyValue::UInt8(value) => Some(UniqueKey::U64(value as u64)),
404 AnyValue::UInt16(value) => Some(UniqueKey::U64(value as u64)),
405 AnyValue::UInt32(value) => Some(UniqueKey::U64(value as u64)),
406 AnyValue::UInt64(value) => Some(UniqueKey::U64(value)),
407 AnyValue::UInt128(value) => Some(UniqueKey::Other(value.to_string())),
408 AnyValue::Float32(value) => Some(UniqueKey::F64((value as f64).to_bits())),
409 AnyValue::Float64(value) => Some(UniqueKey::F64(value.to_bits())),
410 AnyValue::String(value) => Some(UniqueKey::String(value.to_string())),
411 AnyValue::StringOwned(value) => Some(UniqueKey::String(value.to_string())),
412 other => Some(UniqueKey::Other(other.to_string())),
413 }
414}