Skip to main content

icydb_core/db/predicate/
schema.rs

1//! Module: predicate::schema
2//! Responsibility: schema-aware predicate validation and coercion legality checks.
3//! Does not own: runtime predicate execution or index planning strategy.
4//! Boundary: validation boundary between user predicates and executable plans.
5
6use crate::{
7    db::{
8        identity::{EntityName, EntityNameError, IndexName, IndexNameError},
9        predicate::{
10            CoercionId, CoercionSpec, CompareOp, ComparePredicate, Predicate,
11            model::UnsupportedQueryFeature, supports_coercion,
12        },
13    },
14    model::{entity::EntityModel, field::FieldKind, index::IndexModel},
15    traits::FieldValueKind,
16    value::{CoercionFamily, CoercionFamilyExt, Value},
17};
18use std::collections::{BTreeMap, BTreeSet};
19use std::fmt;
20
21///
22/// ScalarType
23///
24/// Internal scalar classification used by predicate validation.
25/// This is deliberately *smaller* than the full schema/type system
26/// and exists only to support:
27/// - coercion rules
28/// - literal compatibility checks
29/// - operator validity (ordering, equality)
30///
31
32#[derive(Clone, Debug, Eq, PartialEq)]
33pub(crate) enum ScalarType {
34    Account,
35    Blob,
36    Bool,
37    Date,
38    Decimal,
39    Duration,
40    Enum,
41    Float32,
42    Float64,
43    Int,
44    Int128,
45    IntBig,
46    Principal,
47    Subaccount,
48    Text,
49    Timestamp,
50    Uint,
51    Uint128,
52    UintBig,
53    Ulid,
54    Unit,
55}
56
57// Local helpers to expand the scalar registry into match arms.
58macro_rules! scalar_coercion_family_from_registry {
59    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
60        match $self {
61            $( ScalarType::$scalar => $coercion_family, )*
62        }
63    };
64}
65
66macro_rules! scalar_matches_value_from_registry {
67    ( @args $self:expr, $value:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
68        matches!(
69            ($self, $value),
70            $( (ScalarType::$scalar, $value_pat) )|*
71        )
72    };
73}
74
75macro_rules! scalar_supports_numeric_coercion_from_registry {
76    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
77        match $self {
78            $( ScalarType::$scalar => $supports_numeric_coercion, )*
79        }
80    };
81}
82
83macro_rules! scalar_is_keyable_from_registry {
84    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
85        match $self {
86            $( ScalarType::$scalar => $is_keyable, )*
87        }
88    };
89}
90
91macro_rules! scalar_supports_ordering_from_registry {
92    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
93        match $self {
94            $( ScalarType::$scalar => $supports_ordering, )*
95        }
96    };
97}
98
99impl ScalarType {
100    #[must_use]
101    pub(crate) const fn coercion_family(&self) -> CoercionFamily {
102        scalar_registry!(scalar_coercion_family_from_registry, self)
103    }
104
105    #[must_use]
106    pub(crate) const fn is_orderable(&self) -> bool {
107        // Predicate-level ordering gate.
108        // Delegates to registry-backed supports_ordering.
109        self.supports_ordering()
110    }
111
112    #[must_use]
113    pub(crate) const fn matches_value(&self, value: &Value) -> bool {
114        scalar_registry!(scalar_matches_value_from_registry, self, value)
115    }
116
117    #[must_use]
118    pub(crate) const fn supports_numeric_coercion(&self) -> bool {
119        scalar_registry!(scalar_supports_numeric_coercion_from_registry, self)
120    }
121
122    #[must_use]
123    pub(crate) const fn is_keyable(&self) -> bool {
124        scalar_registry!(scalar_is_keyable_from_registry, self)
125    }
126
127    #[must_use]
128    pub(crate) const fn supports_ordering(&self) -> bool {
129        scalar_registry!(scalar_supports_ordering_from_registry, self)
130    }
131}
132
133///
134/// FieldType
135///
136/// Reduced runtime type representation used exclusively for predicate validation.
137/// This intentionally drops:
138/// - record structure
139/// - tuple structure
140/// - validator/sanitizer metadata
141///
142
143#[derive(Clone, Debug, Eq, PartialEq)]
144pub(crate) enum FieldType {
145    Scalar(ScalarType),
146    List(Box<Self>),
147    Set(Box<Self>),
148    Map { key: Box<Self>, value: Box<Self> },
149    Structured { queryable: bool },
150}
151
152impl FieldType {
153    #[must_use]
154    pub(crate) const fn value_kind(&self) -> FieldValueKind {
155        match self {
156            Self::Scalar(_) => FieldValueKind::Atomic,
157            Self::List(_) | Self::Set(_) => FieldValueKind::Structured { queryable: true },
158            Self::Map { .. } => FieldValueKind::Structured { queryable: false },
159            Self::Structured { queryable } => FieldValueKind::Structured {
160                queryable: *queryable,
161            },
162        }
163    }
164
165    #[must_use]
166    pub(crate) const fn coercion_family(&self) -> Option<CoercionFamily> {
167        match self {
168            Self::Scalar(inner) => Some(inner.coercion_family()),
169            Self::List(_) | Self::Set(_) | Self::Map { .. } => Some(CoercionFamily::Collection),
170            Self::Structured { .. } => None,
171        }
172    }
173
174    #[must_use]
175    pub(crate) const fn is_text(&self) -> bool {
176        matches!(self, Self::Scalar(ScalarType::Text))
177    }
178
179    #[must_use]
180    pub(crate) const fn is_collection(&self) -> bool {
181        matches!(self, Self::List(_) | Self::Set(_) | Self::Map { .. })
182    }
183
184    #[must_use]
185    pub(crate) const fn is_list_like(&self) -> bool {
186        matches!(self, Self::List(_) | Self::Set(_))
187    }
188
189    #[must_use]
190    pub(crate) const fn is_orderable(&self) -> bool {
191        match self {
192            Self::Scalar(inner) => inner.is_orderable(),
193            _ => false,
194        }
195    }
196
197    #[must_use]
198    pub(crate) const fn is_keyable(&self) -> bool {
199        match self {
200            Self::Scalar(inner) => inner.is_keyable(),
201            _ => false,
202        }
203    }
204
205    #[must_use]
206    pub(crate) const fn supports_numeric_coercion(&self) -> bool {
207        match self {
208            Self::Scalar(inner) => inner.supports_numeric_coercion(),
209            _ => false,
210        }
211    }
212}
213
214pub(crate) fn literal_matches_type(literal: &Value, field_type: &FieldType) -> bool {
215    match field_type {
216        FieldType::Scalar(inner) => inner.matches_value(literal),
217        FieldType::List(element) | FieldType::Set(element) => match literal {
218            Value::List(items) => items.iter().all(|item| literal_matches_type(item, element)),
219            _ => false,
220        },
221        FieldType::Map { key, value } => match literal {
222            Value::Map(entries) => {
223                if Value::validate_map_entries(entries.as_slice()).is_err() {
224                    return false;
225                }
226
227                entries.iter().all(|(entry_key, entry_value)| {
228                    literal_matches_type(entry_key, key) && literal_matches_type(entry_value, value)
229                })
230            }
231            _ => false,
232        },
233        FieldType::Structured { .. } => {
234            // NOTE: non-queryable structured field types never match predicate literals.
235            false
236        }
237    }
238}
239
240pub(super) fn field_type_from_model_kind(kind: &FieldKind) -> FieldType {
241    match kind {
242        FieldKind::Account => FieldType::Scalar(ScalarType::Account),
243        FieldKind::Blob => FieldType::Scalar(ScalarType::Blob),
244        FieldKind::Bool => FieldType::Scalar(ScalarType::Bool),
245        FieldKind::Date => FieldType::Scalar(ScalarType::Date),
246        FieldKind::Decimal { .. } => FieldType::Scalar(ScalarType::Decimal),
247        FieldKind::Duration => FieldType::Scalar(ScalarType::Duration),
248        FieldKind::Enum { .. } => FieldType::Scalar(ScalarType::Enum),
249        FieldKind::Float32 => FieldType::Scalar(ScalarType::Float32),
250        FieldKind::Float64 => FieldType::Scalar(ScalarType::Float64),
251        FieldKind::Int => FieldType::Scalar(ScalarType::Int),
252        FieldKind::Int128 => FieldType::Scalar(ScalarType::Int128),
253        FieldKind::IntBig => FieldType::Scalar(ScalarType::IntBig),
254        FieldKind::Principal => FieldType::Scalar(ScalarType::Principal),
255        FieldKind::Subaccount => FieldType::Scalar(ScalarType::Subaccount),
256        FieldKind::Text => FieldType::Scalar(ScalarType::Text),
257        FieldKind::Timestamp => FieldType::Scalar(ScalarType::Timestamp),
258        FieldKind::Uint => FieldType::Scalar(ScalarType::Uint),
259        FieldKind::Uint128 => FieldType::Scalar(ScalarType::Uint128),
260        FieldKind::UintBig => FieldType::Scalar(ScalarType::UintBig),
261        FieldKind::Ulid => FieldType::Scalar(ScalarType::Ulid),
262        FieldKind::Unit => FieldType::Scalar(ScalarType::Unit),
263        FieldKind::Relation { key_kind, .. } => field_type_from_model_kind(key_kind),
264        FieldKind::List(inner) => FieldType::List(Box::new(field_type_from_model_kind(inner))),
265        FieldKind::Set(inner) => FieldType::Set(Box::new(field_type_from_model_kind(inner))),
266        FieldKind::Map { key, value } => FieldType::Map {
267            key: Box::new(field_type_from_model_kind(key)),
268            value: Box::new(field_type_from_model_kind(value)),
269        },
270        FieldKind::Structured { queryable } => FieldType::Structured {
271            queryable: *queryable,
272        },
273    }
274}
275
276impl fmt::Display for FieldType {
277    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278        match self {
279            Self::Scalar(inner) => write!(f, "{inner:?}"),
280            Self::List(inner) => write!(f, "List<{inner}>"),
281            Self::Set(inner) => write!(f, "Set<{inner}>"),
282            Self::Map { key, value } => write!(f, "Map<{key}, {value}>"),
283            Self::Structured { queryable } => {
284                write!(f, "Structured<queryable={queryable}>")
285            }
286        }
287    }
288}
289
290fn validate_index_fields(
291    fields: &BTreeMap<String, FieldType>,
292    indexes: &[&IndexModel],
293) -> Result<(), ValidateError> {
294    let mut seen_names = BTreeSet::new();
295    for index in indexes {
296        if seen_names.contains(index.name) {
297            return Err(ValidateError::DuplicateIndexName {
298                name: index.name.to_string(),
299            });
300        }
301        seen_names.insert(index.name);
302
303        let mut seen = BTreeSet::new();
304        for field in index.fields {
305            if !fields.contains_key(*field) {
306                return Err(ValidateError::IndexFieldUnknown {
307                    index: **index,
308                    field: (*field).to_string(),
309                });
310            }
311            if seen.contains(*field) {
312                return Err(ValidateError::IndexFieldDuplicate {
313                    index: **index,
314                    field: (*field).to_string(),
315                });
316            }
317            seen.insert(*field);
318
319            let field_type = fields
320                .get(*field)
321                .expect("index field existence checked above");
322            // Guardrail: map fields are deterministic stored values but remain
323            // non-queryable and non-indexable in 0.7.
324            if matches!(field_type, FieldType::Map { .. }) {
325                return Err(ValidateError::IndexFieldMapNotQueryable {
326                    index: **index,
327                    field: (*field).to_string(),
328                });
329            }
330            if !field_type.value_kind().is_queryable() {
331                return Err(ValidateError::IndexFieldNotQueryable {
332                    index: **index,
333                    field: (*field).to_string(),
334                });
335            }
336        }
337    }
338
339    Ok(())
340}
341
342///
343/// SchemaInfo
344///
345/// Lightweight, runtime-usable field-type map for one entity.
346/// This is the *only* schema surface the predicate validator depends on.
347///
348
349#[derive(Clone, Debug)]
350pub(crate) struct SchemaInfo {
351    fields: BTreeMap<String, FieldType>,
352    field_kinds: BTreeMap<String, FieldKind>,
353}
354
355impl SchemaInfo {
356    #[must_use]
357    pub(crate) fn field(&self, name: &str) -> Option<&FieldType> {
358        self.fields.get(name)
359    }
360
361    #[must_use]
362    pub(crate) fn field_kind(&self, name: &str) -> Option<&FieldKind> {
363        self.field_kinds.get(name)
364    }
365
366    /// Builds runtime predicate schema information from an entity model.
367    pub(crate) fn from_entity_model(model: &EntityModel) -> Result<Self, ValidateError> {
368        // Validate identity constraints before building schema maps.
369        let entity_name = EntityName::try_from_str(model.entity_name).map_err(|err| {
370            ValidateError::InvalidEntityName {
371                name: model.entity_name.to_string(),
372                source: err,
373            }
374        })?;
375
376        if !model
377            .fields
378            .iter()
379            .any(|field| std::ptr::eq(field, model.primary_key))
380        {
381            return Err(ValidateError::InvalidPrimaryKey {
382                field: model.primary_key.name.to_string(),
383            });
384        }
385
386        let mut fields = BTreeMap::new();
387        let mut field_kinds = BTreeMap::new();
388        for field in model.fields {
389            if fields.contains_key(field.name) {
390                return Err(ValidateError::DuplicateField {
391                    field: field.name.to_string(),
392                });
393            }
394            let ty = field_type_from_model_kind(&field.kind);
395            fields.insert(field.name.to_string(), ty);
396            field_kinds.insert(field.name.to_string(), field.kind);
397        }
398
399        let pk_field_type = fields
400            .get(model.primary_key.name)
401            .expect("primary key verified above");
402        if !pk_field_type.is_keyable() {
403            return Err(ValidateError::InvalidPrimaryKeyType {
404                field: model.primary_key.name.to_string(),
405            });
406        }
407
408        validate_index_fields(&fields, model.indexes)?;
409        for index in model.indexes {
410            IndexName::try_from_parts(&entity_name, index.fields).map_err(|err| {
411                ValidateError::InvalidIndexName {
412                    index: **index,
413                    source: err,
414                }
415            })?;
416        }
417
418        Ok(Self {
419            fields,
420            field_kinds,
421        })
422    }
423}
424
425/// Predicate/schema validation failures, including invalid model contracts.
426#[derive(Debug, thiserror::Error)]
427pub enum ValidateError {
428    #[error("invalid entity name '{name}': {source}")]
429    InvalidEntityName {
430        name: String,
431        #[source]
432        source: EntityNameError,
433    },
434
435    #[error("invalid index name for '{index}': {source}")]
436    InvalidIndexName {
437        index: IndexModel,
438        #[source]
439        source: IndexNameError,
440    },
441
442    #[error("unknown field '{field}'")]
443    UnknownField { field: String },
444
445    #[error("field '{field}' is not queryable")]
446    NonQueryableFieldType { field: String },
447
448    #[error("duplicate field '{field}'")]
449    DuplicateField { field: String },
450
451    #[error("{0}")]
452    UnsupportedQueryFeature(#[from] UnsupportedQueryFeature),
453
454    #[error("primary key '{field}' not present in entity fields")]
455    InvalidPrimaryKey { field: String },
456
457    #[error("primary key '{field}' has a non-keyable type")]
458    InvalidPrimaryKeyType { field: String },
459
460    #[error("index '{index}' references unknown field '{field}'")]
461    IndexFieldUnknown { index: IndexModel, field: String },
462
463    #[error("index '{index}' references non-queryable field '{field}'")]
464    IndexFieldNotQueryable { index: IndexModel, field: String },
465
466    #[error(
467        "index '{index}' references map field '{field}'; map fields are not queryable in icydb 0.7"
468    )]
469    IndexFieldMapNotQueryable { index: IndexModel, field: String },
470
471    #[error("index '{index}' repeats field '{field}'")]
472    IndexFieldDuplicate { index: IndexModel, field: String },
473
474    #[error("duplicate index name '{name}'")]
475    DuplicateIndexName { name: String },
476
477    #[error("operator {op} is not valid for field '{field}'")]
478    InvalidOperator { field: String, op: String },
479
480    #[error("coercion {coercion:?} is not valid for field '{field}'")]
481    InvalidCoercion { field: String, coercion: CoercionId },
482
483    #[error("invalid literal for field '{field}': {message}")]
484    InvalidLiteral { field: String, message: String },
485}
486
487impl ValidateError {
488    pub(crate) fn invalid_operator(field: &str, op: impl fmt::Display) -> Self {
489        Self::InvalidOperator {
490            field: field.to_string(),
491            op: op.to_string(),
492        }
493    }
494
495    pub(crate) fn invalid_literal(field: &str, msg: &str) -> Self {
496        Self::InvalidLiteral {
497            field: field.to_string(),
498            message: msg.to_string(),
499        }
500    }
501}
502
503/// Reject policy-level non-queryable features before planning.
504pub(crate) fn reject_unsupported_query_features(
505    predicate: &Predicate,
506) -> Result<(), UnsupportedQueryFeature> {
507    match predicate {
508        Predicate::True
509        | Predicate::False
510        | Predicate::Compare(_)
511        | Predicate::IsNull { .. }
512        | Predicate::IsMissing { .. }
513        | Predicate::IsEmpty { .. }
514        | Predicate::IsNotEmpty { .. }
515        | Predicate::TextContains { .. }
516        | Predicate::TextContainsCi { .. } => Ok(()),
517        Predicate::And(children) | Predicate::Or(children) => {
518            for child in children {
519                reject_unsupported_query_features(child)?;
520            }
521
522            Ok(())
523        }
524        Predicate::Not(inner) => reject_unsupported_query_features(inner),
525    }
526}
527
528/// Validates a predicate against the provided schema information.
529pub(crate) fn validate(schema: &SchemaInfo, predicate: &Predicate) -> Result<(), ValidateError> {
530    reject_unsupported_query_features(predicate)?;
531
532    match predicate {
533        Predicate::True | Predicate::False => Ok(()),
534        Predicate::And(children) | Predicate::Or(children) => {
535            for child in children {
536                validate(schema, child)?;
537            }
538            Ok(())
539        }
540        Predicate::Not(inner) => validate(schema, inner),
541        Predicate::Compare(cmp) => validate_compare(schema, cmp),
542        Predicate::IsNull { field } | Predicate::IsMissing { field } => {
543            let _field_type = ensure_field(schema, field)?;
544            Ok(())
545        }
546        Predicate::IsEmpty { field } => {
547            let field_type = ensure_field(schema, field)?;
548            if field_type.is_text() || field_type.is_collection() {
549                Ok(())
550            } else {
551                Err(ValidateError::invalid_operator(field, "is_empty"))
552            }
553        }
554        Predicate::IsNotEmpty { field } => {
555            let field_type = ensure_field(schema, field)?;
556            if field_type.is_text() || field_type.is_collection() {
557                Ok(())
558            } else {
559                Err(ValidateError::invalid_operator(field, "is_not_empty"))
560            }
561        }
562        Predicate::TextContains { field, value } => {
563            validate_text_contains(schema, field, value, "text_contains")
564        }
565        Predicate::TextContainsCi { field, value } => {
566            validate_text_contains(schema, field, value, "text_contains_ci")
567        }
568    }
569}
570
571fn validate_compare(schema: &SchemaInfo, cmp: &ComparePredicate) -> Result<(), ValidateError> {
572    let field_type = ensure_field(schema, &cmp.field)?;
573
574    match cmp.op {
575        CompareOp::Eq | CompareOp::Ne => {
576            validate_eq_ne(&cmp.field, field_type, &cmp.value, &cmp.coercion)
577        }
578        CompareOp::Lt | CompareOp::Lte | CompareOp::Gt | CompareOp::Gte => {
579            validate_ordering(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
580        }
581        CompareOp::In | CompareOp::NotIn => {
582            validate_in(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
583        }
584        CompareOp::Contains => validate_contains(&cmp.field, field_type, &cmp.value, &cmp.coercion),
585        CompareOp::StartsWith | CompareOp::EndsWith => {
586            validate_text_compare(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
587        }
588    }
589}
590
591fn validate_eq_ne(
592    field: &str,
593    field_type: &FieldType,
594    value: &Value,
595    coercion: &CoercionSpec,
596) -> Result<(), ValidateError> {
597    if field_type.is_list_like() {
598        ensure_list_literal(field, value, field_type)?;
599    } else {
600        ensure_scalar_literal(field, value)?;
601    }
602
603    ensure_coercion(field, field_type, value, coercion)
604}
605
606fn validate_ordering(
607    field: &str,
608    field_type: &FieldType,
609    value: &Value,
610    coercion: &CoercionSpec,
611    op: CompareOp,
612) -> Result<(), ValidateError> {
613    if matches!(coercion.id, CoercionId::CollectionElement) {
614        return Err(ValidateError::InvalidCoercion {
615            field: field.to_string(),
616            coercion: coercion.id,
617        });
618    }
619
620    if !field_type.is_orderable() {
621        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
622    }
623
624    ensure_scalar_literal(field, value)?;
625
626    ensure_coercion(field, field_type, value, coercion)
627}
628
629/// Validate list membership predicates.
630fn validate_in(
631    field: &str,
632    field_type: &FieldType,
633    value: &Value,
634    coercion: &CoercionSpec,
635    op: CompareOp,
636) -> Result<(), ValidateError> {
637    if field_type.is_collection() {
638        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
639    }
640
641    let Value::List(items) = value else {
642        return Err(ValidateError::invalid_literal(
643            field,
644            "expected list literal",
645        ));
646    };
647
648    for item in items {
649        ensure_coercion(field, field_type, item, coercion)?;
650    }
651
652    Ok(())
653}
654
655/// Validate collection containment predicates on list/set fields.
656fn validate_contains(
657    field: &str,
658    field_type: &FieldType,
659    value: &Value,
660    coercion: &CoercionSpec,
661) -> Result<(), ValidateError> {
662    if field_type.is_text() {
663        // CONTRACT: text substring matching uses TextContains/TextContainsCi only.
664        return Err(ValidateError::invalid_operator(
665            field,
666            format!("{:?}", CompareOp::Contains),
667        ));
668    }
669
670    let element_type = match field_type {
671        FieldType::List(inner) | FieldType::Set(inner) => inner.as_ref(),
672        _ => {
673            return Err(ValidateError::invalid_operator(
674                field,
675                format!("{:?}", CompareOp::Contains),
676            ));
677        }
678    };
679
680    if matches!(coercion.id, CoercionId::TextCasefold) {
681        // CONTRACT: case-insensitive coercion never applies to structured values.
682        return Err(ValidateError::InvalidCoercion {
683            field: field.to_string(),
684            coercion: coercion.id,
685        });
686    }
687
688    ensure_coercion(field, element_type, value, coercion)
689}
690
691/// Validate text prefix/suffix comparisons.
692fn validate_text_compare(
693    field: &str,
694    field_type: &FieldType,
695    value: &Value,
696    coercion: &CoercionSpec,
697    op: CompareOp,
698) -> Result<(), ValidateError> {
699    if !field_type.is_text() {
700        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
701    }
702
703    ensure_text_literal(field, value)?;
704
705    ensure_coercion(field, field_type, value, coercion)
706}
707
708/// Validate substring predicates on text fields.
709fn validate_text_contains(
710    schema: &SchemaInfo,
711    field: &str,
712    value: &Value,
713    op: &str,
714) -> Result<(), ValidateError> {
715    let field_type = ensure_field(schema, field)?;
716    if !field_type.is_text() {
717        return Err(ValidateError::invalid_operator(field, op));
718    }
719
720    ensure_text_literal(field, value)?;
721
722    Ok(())
723}
724
725fn ensure_field<'a>(schema: &'a SchemaInfo, field: &str) -> Result<&'a FieldType, ValidateError> {
726    let field_type = schema
727        .field(field)
728        .ok_or_else(|| ValidateError::UnknownField {
729            field: field.to_string(),
730        })?;
731
732    if matches!(field_type, FieldType::Map { .. }) {
733        return Err(UnsupportedQueryFeature::MapPredicate {
734            field: field.to_string(),
735        }
736        .into());
737    }
738
739    if !field_type.value_kind().is_queryable() {
740        return Err(ValidateError::NonQueryableFieldType {
741            field: field.to_string(),
742        });
743    }
744
745    Ok(field_type)
746}
747
748// Ensure the literal is text to match text-only operators.
749fn ensure_text_literal(field: &str, value: &Value) -> Result<(), ValidateError> {
750    if !matches!(value, Value::Text(_)) {
751        return Err(ValidateError::invalid_literal(
752            field,
753            "expected text literal",
754        ));
755    }
756
757    Ok(())
758}
759
760// Reject list literals when scalar comparisons are required.
761fn ensure_scalar_literal(field: &str, value: &Value) -> Result<(), ValidateError> {
762    if matches!(value, Value::List(_)) {
763        return Err(ValidateError::invalid_literal(
764            field,
765            "expected scalar literal",
766        ));
767    }
768
769    Ok(())
770}
771
772fn ensure_coercion(
773    field: &str,
774    field_type: &FieldType,
775    literal: &Value,
776    coercion: &CoercionSpec,
777) -> Result<(), ValidateError> {
778    if matches!(coercion.id, CoercionId::TextCasefold) && !field_type.is_text() {
779        // CONTRACT: case-insensitive coercions are text-only.
780        return Err(ValidateError::InvalidCoercion {
781            field: field.to_string(),
782            coercion: coercion.id,
783        });
784    }
785
786    // NOTE:
787    // NumericWiden eligibility is registry-authoritative.
788    // CoercionFamily::Numeric is intentionally NOT sufficient.
789    // This prevents validation/runtime divergence for Date, IntBig, UintBig.
790    if matches!(coercion.id, CoercionId::NumericWiden)
791        && (!field_type.supports_numeric_coercion() || !literal.supports_numeric_coercion())
792    {
793        return Err(ValidateError::InvalidCoercion {
794            field: field.to_string(),
795            coercion: coercion.id,
796        });
797    }
798
799    if !matches!(coercion.id, CoercionId::NumericWiden) {
800        let left_family =
801            field_type
802                .coercion_family()
803                .ok_or_else(|| ValidateError::NonQueryableFieldType {
804                    field: field.to_string(),
805                })?;
806        let right_family = literal.coercion_family();
807
808        if !supports_coercion(left_family, right_family, coercion.id) {
809            return Err(ValidateError::InvalidCoercion {
810                field: field.to_string(),
811                coercion: coercion.id,
812            });
813        }
814    }
815
816    if matches!(
817        coercion.id,
818        CoercionId::Strict | CoercionId::CollectionElement
819    ) && !literal_matches_type(literal, field_type)
820    {
821        return Err(ValidateError::invalid_literal(
822            field,
823            "literal type does not match field type",
824        ));
825    }
826
827    Ok(())
828}
829
830fn ensure_list_literal(
831    field: &str,
832    literal: &Value,
833    field_type: &FieldType,
834) -> Result<(), ValidateError> {
835    if !literal_matches_type(literal, field_type) {
836        return Err(ValidateError::invalid_literal(
837            field,
838            "list literal does not match field element type",
839        ));
840    }
841
842    Ok(())
843}