Skip to main content

icydb_core/db/query/predicate/
normalize.rs

1use crate::{
2    db::query::predicate::{
3        ast::{ComparePredicate, Predicate},
4        coercion::{CoercionId, CoercionSpec},
5    },
6    value::{Value, ValueEnum},
7};
8
9///
10/// Normalize a predicate into a canonical, deterministic form.
11///
12/// Normalization guarantees:
13/// - Logical equivalence is preserved
14/// - Nested AND / OR nodes are flattened
15/// - Neutral elements are removed (True / False)
16/// - Double negation is eliminated
17/// - Child predicates are deterministically ordered
18///
19/// Note: this pass does not normalize literal values (numeric width, collation).
20/// Ordering uses the structural `Value` representation.
21///
22/// This is used to ensure:
23/// - stable planner output
24/// - consistent caching / equality checks
25/// - predictable test behavior
26///
27#[must_use]
28pub fn normalize(predicate: &Predicate) -> Predicate {
29    match predicate {
30        Predicate::True => Predicate::True,
31        Predicate::False => Predicate::False,
32
33        Predicate::And(children) => normalize_and(children),
34        Predicate::Or(children) => normalize_or(children),
35        Predicate::Not(inner) => normalize_not(inner),
36
37        Predicate::Compare(cmp) => Predicate::Compare(normalize_compare(cmp)),
38
39        Predicate::IsNull { field } => Predicate::IsNull {
40            field: field.clone(),
41        },
42        Predicate::IsMissing { field } => Predicate::IsMissing {
43            field: field.clone(),
44        },
45        Predicate::IsEmpty { field } => Predicate::IsEmpty {
46            field: field.clone(),
47        },
48        Predicate::IsNotEmpty { field } => Predicate::IsNotEmpty {
49            field: field.clone(),
50        },
51        Predicate::TextContains { field, value } => Predicate::TextContains {
52            field: field.clone(),
53            value: value.clone(),
54        },
55        Predicate::TextContainsCi { field, value } => Predicate::TextContainsCi {
56            field: field.clone(),
57            value: value.clone(),
58        },
59    }
60}
61
62///
63/// Normalize a comparison predicate by cloning its components.
64///
65/// This function exists primarily for symmetry and future-proofing
66/// (e.g. if comparison-level rewrites are introduced later).
67///
68fn normalize_compare(cmp: &ComparePredicate) -> ComparePredicate {
69    ComparePredicate {
70        field: cmp.field.clone(),
71        op: cmp.op,
72        value: cmp.value.clone(),
73        coercion: cmp.coercion.clone(),
74    }
75}
76
77///
78/// Normalize a NOT expression.
79///
80/// Eliminates double negation:
81///     NOT (NOT x)  →  x
82///
83fn normalize_not(inner: &Predicate) -> Predicate {
84    let normalized = normalize(inner);
85
86    if let Predicate::Not(double) = normalized {
87        return normalize(&double);
88    }
89
90    Predicate::Not(Box::new(normalized))
91}
92
93///
94/// Normalize an AND expression.
95///
96/// Rules:
97/// - AND(True, x)        → x
98/// - AND(False, x)       → False
99/// - AND(AND(a, b), c)   → AND(a, b, c)
100/// - AND()               → True
101///
102/// Children are sorted deterministically.
103///
104fn normalize_and(children: &[Predicate]) -> Predicate {
105    let mut out = Vec::new();
106
107    for child in children {
108        let normalized = normalize(child);
109
110        match normalized {
111            Predicate::True => {}
112            Predicate::False => return Predicate::False,
113            Predicate::And(grandchildren) => out.extend(grandchildren),
114            other => out.push(other),
115        }
116    }
117
118    if out.is_empty() {
119        return Predicate::True;
120    }
121
122    out.sort_by_cached_key(sort_key);
123    Predicate::And(out)
124}
125
126///
127/// Normalize an OR expression.
128///
129/// Rules:
130/// - OR(False, x)       → x
131/// - OR(True, x)        → True
132/// - OR(OR(a, b), c)    → OR(a, b, c)
133/// - OR()               → False
134///
135/// Children are sorted deterministically.
136///
137fn normalize_or(children: &[Predicate]) -> Predicate {
138    let mut out = Vec::new();
139
140    for child in children {
141        let normalized = normalize(child);
142
143        match normalized {
144            Predicate::False => {}
145            Predicate::True => return Predicate::True,
146            Predicate::Or(grandchildren) => out.extend(grandchildren),
147            other => out.push(other),
148        }
149    }
150
151    if out.is_empty() {
152        return Predicate::False;
153    }
154
155    out.sort_by_cached_key(sort_key);
156    Predicate::Or(out)
157}
158
159///
160/// Generate a deterministic, length-prefixed key for a predicate.
161///
162/// This key is used **only for sorting**, not for display.
163/// Ordering ensures:
164/// - planner determinism
165/// - stable normalization
166/// - predictable equality
167///
168fn sort_key(predicate: &Predicate) -> Vec<u8> {
169    let mut out = Vec::new();
170    encode_predicate_key(&mut out, predicate);
171    out
172}
173
174const PRED_TRUE: u8 = 0x00;
175const PRED_FALSE: u8 = 0x01;
176const PRED_AND: u8 = 0x02;
177const PRED_OR: u8 = 0x03;
178const PRED_NOT: u8 = 0x04;
179const PRED_COMPARE: u8 = 0x05;
180const PRED_IS_NULL: u8 = 0x06;
181const PRED_IS_MISSING: u8 = 0x07;
182const PRED_IS_EMPTY: u8 = 0x08;
183const PRED_IS_NOT_EMPTY: u8 = 0x09;
184const PRED_TEXT_CONTAINS: u8 = 0x0D;
185const PRED_TEXT_CONTAINS_CI: u8 = 0x0E;
186
187// Encode predicate keys with length-prefixed segments to avoid collisions.
188fn encode_predicate_key(out: &mut Vec<u8>, predicate: &Predicate) {
189    match predicate {
190        Predicate::True => out.push(PRED_TRUE),
191        Predicate::False => out.push(PRED_FALSE),
192        Predicate::And(children) => {
193            out.push(PRED_AND);
194            push_len(out, children.len());
195            for child in children {
196                push_predicate(out, child);
197            }
198        }
199        Predicate::Or(children) => {
200            out.push(PRED_OR);
201            push_len(out, children.len());
202            for child in children {
203                push_predicate(out, child);
204            }
205        }
206        Predicate::Not(inner) => {
207            out.push(PRED_NOT);
208            push_predicate(out, inner);
209        }
210        Predicate::Compare(cmp) => {
211            out.push(PRED_COMPARE);
212            push_str(out, &cmp.field);
213            out.push(cmp.op.tag());
214            push_value(out, &cmp.value);
215            push_coercion(out, &cmp.coercion);
216        }
217        Predicate::IsNull { field } => {
218            out.push(PRED_IS_NULL);
219            push_str(out, field);
220        }
221        Predicate::IsMissing { field } => {
222            out.push(PRED_IS_MISSING);
223            push_str(out, field);
224        }
225        Predicate::IsEmpty { field } => {
226            out.push(PRED_IS_EMPTY);
227            push_str(out, field);
228        }
229        Predicate::IsNotEmpty { field } => {
230            out.push(PRED_IS_NOT_EMPTY);
231            push_str(out, field);
232        }
233        Predicate::TextContains { field, value } => {
234            out.push(PRED_TEXT_CONTAINS);
235            push_str(out, field);
236            push_value(out, value);
237        }
238        Predicate::TextContainsCi { field, value } => {
239            out.push(PRED_TEXT_CONTAINS_CI);
240            push_str(out, field);
241            push_value(out, value);
242        }
243    }
244}
245
246const VALUE_ACCOUNT: u8 = 1;
247const VALUE_BLOB: u8 = 2;
248const VALUE_BOOL: u8 = 3;
249const VALUE_DATE: u8 = 4;
250const VALUE_DECIMAL: u8 = 5;
251const VALUE_DURATION: u8 = 6;
252const VALUE_ENUM: u8 = 7;
253const VALUE_E8S: u8 = 8;
254const VALUE_E18S: u8 = 9;
255const VALUE_FLOAT32: u8 = 10;
256const VALUE_FLOAT64: u8 = 11;
257const VALUE_INT: u8 = 12;
258const VALUE_INT128: u8 = 13;
259const VALUE_INT_BIG: u8 = 14;
260const VALUE_LIST: u8 = 15;
261const VALUE_MAP: u8 = 16;
262const VALUE_NULL: u8 = 17;
263const VALUE_PRINCIPAL: u8 = 18;
264const VALUE_SUBACCOUNT: u8 = 19;
265const VALUE_TEXT: u8 = 20;
266const VALUE_TIMESTAMP: u8 = 21;
267const VALUE_UINT: u8 = 22;
268const VALUE_UINT128: u8 = 23;
269const VALUE_UINT_BIG: u8 = 24;
270const VALUE_ULID: u8 = 25;
271const VALUE_UNIT: u8 = 26;
272
273#[expect(clippy::too_many_lines)]
274fn encode_value_key(out: &mut Vec<u8>, value: &Value) {
275    match value {
276        Value::Account(v) => {
277            out.push(VALUE_ACCOUNT);
278            push_bytes(out, v.owner.as_slice());
279            match v.subaccount {
280                Some(sub) => {
281                    out.push(1);
282                    push_bytes(out, &sub.to_bytes());
283                }
284                None => out.push(0),
285            }
286        }
287        Value::Blob(v) => {
288            out.push(VALUE_BLOB);
289            push_bytes(out, v);
290        }
291        Value::Bool(v) => {
292            out.push(VALUE_BOOL);
293            out.push(u8::from(*v));
294        }
295        Value::Date(v) => {
296            out.push(VALUE_DATE);
297            out.extend_from_slice(&v.get().to_be_bytes());
298        }
299        Value::Decimal(v) => {
300            out.push(VALUE_DECIMAL);
301            out.push(u8::from(v.is_sign_negative()));
302            out.extend_from_slice(&v.scale().to_be_bytes());
303            out.extend_from_slice(&v.mantissa().to_be_bytes());
304        }
305        Value::Duration(v) => {
306            out.push(VALUE_DURATION);
307            out.extend_from_slice(&v.get().to_be_bytes());
308        }
309        Value::Enum(v) => {
310            out.push(VALUE_ENUM);
311            push_enum(out, v);
312        }
313        Value::E8s(v) => {
314            out.push(VALUE_E8S);
315            out.extend_from_slice(&v.get().to_be_bytes());
316        }
317        Value::E18s(v) => {
318            out.push(VALUE_E18S);
319            out.extend_from_slice(&v.get().to_be_bytes());
320        }
321        Value::Float32(v) => {
322            out.push(VALUE_FLOAT32);
323            out.extend_from_slice(&v.to_be_bytes());
324        }
325        Value::Float64(v) => {
326            out.push(VALUE_FLOAT64);
327            out.extend_from_slice(&v.to_be_bytes());
328        }
329        Value::Int(v) => {
330            out.push(VALUE_INT);
331            out.extend_from_slice(&v.to_be_bytes());
332        }
333        Value::Int128(v) => {
334            out.push(VALUE_INT128);
335            out.extend_from_slice(&v.get().to_be_bytes());
336        }
337        Value::IntBig(v) => {
338            out.push(VALUE_INT_BIG);
339            push_bytes(out, &v.to_leb128());
340        }
341        Value::List(items) => {
342            out.push(VALUE_LIST);
343            push_len(out, items.len());
344            for item in items {
345                push_value(out, item);
346            }
347        }
348        Value::Map(entries) => {
349            out.push(VALUE_MAP);
350            push_len(out, entries.len());
351            for (key, value) in entries {
352                push_value(out, key);
353                push_value(out, value);
354            }
355        }
356        Value::Null => out.push(VALUE_NULL),
357        Value::Principal(v) => {
358            out.push(VALUE_PRINCIPAL);
359            push_bytes(out, v.as_slice());
360        }
361        Value::Subaccount(v) => {
362            out.push(VALUE_SUBACCOUNT);
363            push_bytes(out, &v.to_bytes());
364        }
365        Value::Text(v) => {
366            out.push(VALUE_TEXT);
367            push_str(out, v);
368        }
369        Value::Timestamp(v) => {
370            out.push(VALUE_TIMESTAMP);
371            out.extend_from_slice(&v.get().to_be_bytes());
372        }
373        Value::Uint(v) => {
374            out.push(VALUE_UINT);
375            out.extend_from_slice(&v.to_be_bytes());
376        }
377        Value::Uint128(v) => {
378            out.push(VALUE_UINT128);
379            out.extend_from_slice(&v.get().to_be_bytes());
380        }
381        Value::UintBig(v) => {
382            out.push(VALUE_UINT_BIG);
383            push_bytes(out, &v.to_leb128());
384        }
385        Value::Ulid(v) => {
386            out.push(VALUE_ULID);
387            out.extend_from_slice(&v.to_bytes());
388        }
389        Value::Unit => out.push(VALUE_UNIT),
390    }
391}
392
393fn push_predicate(out: &mut Vec<u8>, predicate: &Predicate) {
394    let mut buf = Vec::new();
395    encode_predicate_key(&mut buf, predicate);
396    push_bytes(out, &buf);
397}
398
399fn push_value(out: &mut Vec<u8>, value: &Value) {
400    let mut buf = Vec::new();
401    encode_value_key(&mut buf, value);
402    push_bytes(out, &buf);
403}
404
405fn push_enum(out: &mut Vec<u8>, value: &ValueEnum) {
406    match &value.path {
407        Some(path) => {
408            out.push(1);
409            push_str(out, path);
410        }
411        None => out.push(0),
412    }
413    push_str(out, &value.variant);
414    match &value.payload {
415        Some(payload) => {
416            out.push(1);
417            push_value(out, payload);
418        }
419        None => out.push(0),
420    }
421}
422
423fn push_coercion(out: &mut Vec<u8>, spec: &CoercionSpec) {
424    out.push(coercion_id_tag(spec.id));
425    push_len(out, spec.params.len());
426    for (key, value) in &spec.params {
427        push_str(out, key);
428        push_str(out, value);
429    }
430}
431
432const fn coercion_id_tag(id: CoercionId) -> u8 {
433    match id {
434        CoercionId::Strict => 0,
435        CoercionId::NumericWiden => 1,
436        CoercionId::TextCasefold => 3,
437        CoercionId::CollectionElement => 4,
438    }
439}
440
441fn push_len(out: &mut Vec<u8>, len: usize) {
442    // NOTE: Sort keys are diagnostics-only; overflow saturates for determinism.
443    let len = u64::try_from(len).unwrap_or(u64::MAX);
444    out.extend_from_slice(&len.to_be_bytes());
445}
446
447fn push_bytes(out: &mut Vec<u8>, bytes: &[u8]) {
448    push_len(out, bytes.len());
449    out.extend_from_slice(bytes);
450}
451
452fn push_str(out: &mut Vec<u8>, s: &str) {
453    push_bytes(out, s.as_bytes());
454}
455
456///
457/// TESTS
458///
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use crate::db::query::predicate::CompareOp;
464
465    #[test]
466    fn sort_key_distinguishes_list_text_with_delimiters() {
467        let left = Predicate::Compare(ComparePredicate {
468            field: "field".to_string(),
469            op: CompareOp::Eq,
470            value: Value::List(vec![Value::Text("a,b".to_string())]),
471            coercion: CoercionSpec::default(),
472        });
473        let right = Predicate::Compare(ComparePredicate {
474            field: "field".to_string(),
475            op: CompareOp::Eq,
476            value: Value::List(vec![
477                Value::Text("a".to_string()),
478                Value::Text("b".to_string()),
479            ]),
480            coercion: CoercionSpec::default(),
481        });
482
483        assert_ne!(sort_key(&left), sort_key(&right));
484    }
485}