Skip to main content

icydb_core/db/predicate/
schema.rs

1use crate::{
2    db::{
3        identity::{EntityName, EntityNameError, IndexName, IndexNameError},
4        predicate::{
5            CoercionSpec, CompareOp, ComparePredicate, Predicate,
6            model::UnsupportedQueryFeature,
7            semantics::{CoercionId, supports_coercion},
8        },
9    },
10    model::{entity::EntityModel, field::FieldKind, index::IndexModel},
11    traits::FieldValueKind,
12    value::{CoercionFamily, CoercionFamilyExt, Value},
13};
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16
17///
18/// ScalarType
19///
20/// Internal scalar classification used by predicate validation.
21/// This is deliberately *smaller* than the full schema/type system
22/// and exists only to support:
23/// - coercion rules
24/// - literal compatibility checks
25/// - operator validity (ordering, equality)
26///
27
28#[derive(Clone, Debug, Eq, PartialEq)]
29pub(crate) enum ScalarType {
30    Account,
31    Blob,
32    Bool,
33    Date,
34    Decimal,
35    Duration,
36    Enum,
37    Float32,
38    Float64,
39    Int,
40    Int128,
41    IntBig,
42    Principal,
43    Subaccount,
44    Text,
45    Timestamp,
46    Uint,
47    Uint128,
48    UintBig,
49    Ulid,
50    Unit,
51}
52
53// Local helpers to expand the scalar registry into match arms.
54macro_rules! scalar_coercion_family_from_registry {
55    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
56        match $self {
57            $( ScalarType::$scalar => $coercion_family, )*
58        }
59    };
60}
61
62macro_rules! scalar_matches_value_from_registry {
63    ( @args $self:expr, $value:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
64        matches!(
65            ($self, $value),
66            $( (ScalarType::$scalar, $value_pat) )|*
67        )
68    };
69}
70
71macro_rules! scalar_supports_numeric_coercion_from_registry {
72    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
73        match $self {
74            $( ScalarType::$scalar => $supports_numeric_coercion, )*
75        }
76    };
77}
78
79macro_rules! scalar_is_keyable_from_registry {
80    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
81        match $self {
82            $( ScalarType::$scalar => $is_keyable, )*
83        }
84    };
85}
86
87macro_rules! scalar_supports_ordering_from_registry {
88    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
89        match $self {
90            $( ScalarType::$scalar => $supports_ordering, )*
91        }
92    };
93}
94
95impl ScalarType {
96    #[must_use]
97    pub(crate) const fn coercion_family(&self) -> CoercionFamily {
98        scalar_registry!(scalar_coercion_family_from_registry, self)
99    }
100
101    #[must_use]
102    pub(crate) const fn is_orderable(&self) -> bool {
103        // Predicate-level ordering gate.
104        // Delegates to registry-backed supports_ordering.
105        self.supports_ordering()
106    }
107
108    #[must_use]
109    pub(crate) const fn matches_value(&self, value: &Value) -> bool {
110        scalar_registry!(scalar_matches_value_from_registry, self, value)
111    }
112
113    #[must_use]
114    pub(crate) const fn supports_numeric_coercion(&self) -> bool {
115        scalar_registry!(scalar_supports_numeric_coercion_from_registry, self)
116    }
117
118    #[must_use]
119    pub(crate) const fn is_keyable(&self) -> bool {
120        scalar_registry!(scalar_is_keyable_from_registry, self)
121    }
122
123    #[must_use]
124    pub(crate) const fn supports_ordering(&self) -> bool {
125        scalar_registry!(scalar_supports_ordering_from_registry, self)
126    }
127}
128
129///
130/// FieldType
131///
132/// Reduced runtime type representation used exclusively for predicate validation.
133/// This intentionally drops:
134/// - record structure
135/// - tuple structure
136/// - validator/sanitizer metadata
137///
138
139#[derive(Clone, Debug, Eq, PartialEq)]
140pub(crate) enum FieldType {
141    Scalar(ScalarType),
142    List(Box<Self>),
143    Set(Box<Self>),
144    Map { key: Box<Self>, value: Box<Self> },
145    Structured { queryable: bool },
146}
147
148impl FieldType {
149    #[must_use]
150    pub(crate) const fn value_kind(&self) -> FieldValueKind {
151        match self {
152            Self::Scalar(_) => FieldValueKind::Atomic,
153            Self::List(_) | Self::Set(_) => FieldValueKind::Structured { queryable: true },
154            Self::Map { .. } => FieldValueKind::Structured { queryable: false },
155            Self::Structured { queryable } => FieldValueKind::Structured {
156                queryable: *queryable,
157            },
158        }
159    }
160
161    #[must_use]
162    pub(crate) const fn coercion_family(&self) -> Option<CoercionFamily> {
163        match self {
164            Self::Scalar(inner) => Some(inner.coercion_family()),
165            Self::List(_) | Self::Set(_) | Self::Map { .. } => Some(CoercionFamily::Collection),
166            Self::Structured { .. } => None,
167        }
168    }
169
170    #[must_use]
171    pub(crate) const fn is_text(&self) -> bool {
172        matches!(self, Self::Scalar(ScalarType::Text))
173    }
174
175    #[must_use]
176    pub(crate) const fn is_collection(&self) -> bool {
177        matches!(self, Self::List(_) | Self::Set(_) | Self::Map { .. })
178    }
179
180    #[must_use]
181    pub(crate) const fn is_list_like(&self) -> bool {
182        matches!(self, Self::List(_) | Self::Set(_))
183    }
184
185    #[must_use]
186    pub(crate) const fn is_orderable(&self) -> bool {
187        match self {
188            Self::Scalar(inner) => inner.is_orderable(),
189            _ => false,
190        }
191    }
192
193    #[must_use]
194    pub(crate) const fn is_keyable(&self) -> bool {
195        match self {
196            Self::Scalar(inner) => inner.is_keyable(),
197            _ => false,
198        }
199    }
200
201    #[must_use]
202    pub(crate) const fn supports_numeric_coercion(&self) -> bool {
203        match self {
204            Self::Scalar(inner) => inner.supports_numeric_coercion(),
205            _ => false,
206        }
207    }
208}
209
210pub(crate) fn literal_matches_type(literal: &Value, field_type: &FieldType) -> bool {
211    match field_type {
212        FieldType::Scalar(inner) => inner.matches_value(literal),
213        FieldType::List(element) | FieldType::Set(element) => match literal {
214            Value::List(items) => items.iter().all(|item| literal_matches_type(item, element)),
215            _ => false,
216        },
217        FieldType::Map { key, value } => match literal {
218            Value::Map(entries) => {
219                if Value::validate_map_entries(entries.as_slice()).is_err() {
220                    return false;
221                }
222
223                entries.iter().all(|(entry_key, entry_value)| {
224                    literal_matches_type(entry_key, key) && literal_matches_type(entry_value, value)
225                })
226            }
227            _ => false,
228        },
229        FieldType::Structured { .. } => {
230            // NOTE: non-queryable structured field types never match predicate literals.
231            false
232        }
233    }
234}
235
236pub(super) fn field_type_from_model_kind(kind: &FieldKind) -> FieldType {
237    match kind {
238        FieldKind::Account => FieldType::Scalar(ScalarType::Account),
239        FieldKind::Blob => FieldType::Scalar(ScalarType::Blob),
240        FieldKind::Bool => FieldType::Scalar(ScalarType::Bool),
241        FieldKind::Date => FieldType::Scalar(ScalarType::Date),
242        FieldKind::Decimal { .. } => FieldType::Scalar(ScalarType::Decimal),
243        FieldKind::Duration => FieldType::Scalar(ScalarType::Duration),
244        FieldKind::Enum { .. } => FieldType::Scalar(ScalarType::Enum),
245        FieldKind::Float32 => FieldType::Scalar(ScalarType::Float32),
246        FieldKind::Float64 => FieldType::Scalar(ScalarType::Float64),
247        FieldKind::Int => FieldType::Scalar(ScalarType::Int),
248        FieldKind::Int128 => FieldType::Scalar(ScalarType::Int128),
249        FieldKind::IntBig => FieldType::Scalar(ScalarType::IntBig),
250        FieldKind::Principal => FieldType::Scalar(ScalarType::Principal),
251        FieldKind::Subaccount => FieldType::Scalar(ScalarType::Subaccount),
252        FieldKind::Text => FieldType::Scalar(ScalarType::Text),
253        FieldKind::Timestamp => FieldType::Scalar(ScalarType::Timestamp),
254        FieldKind::Uint => FieldType::Scalar(ScalarType::Uint),
255        FieldKind::Uint128 => FieldType::Scalar(ScalarType::Uint128),
256        FieldKind::UintBig => FieldType::Scalar(ScalarType::UintBig),
257        FieldKind::Ulid => FieldType::Scalar(ScalarType::Ulid),
258        FieldKind::Unit => FieldType::Scalar(ScalarType::Unit),
259        FieldKind::Relation { key_kind, .. } => field_type_from_model_kind(key_kind),
260        FieldKind::List(inner) => FieldType::List(Box::new(field_type_from_model_kind(inner))),
261        FieldKind::Set(inner) => FieldType::Set(Box::new(field_type_from_model_kind(inner))),
262        FieldKind::Map { key, value } => FieldType::Map {
263            key: Box::new(field_type_from_model_kind(key)),
264            value: Box::new(field_type_from_model_kind(value)),
265        },
266        FieldKind::Structured { queryable } => FieldType::Structured {
267            queryable: *queryable,
268        },
269    }
270}
271
272impl fmt::Display for FieldType {
273    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274        match self {
275            Self::Scalar(inner) => write!(f, "{inner:?}"),
276            Self::List(inner) => write!(f, "List<{inner}>"),
277            Self::Set(inner) => write!(f, "Set<{inner}>"),
278            Self::Map { key, value } => write!(f, "Map<{key}, {value}>"),
279            Self::Structured { queryable } => {
280                write!(f, "Structured<queryable={queryable}>")
281            }
282        }
283    }
284}
285
286fn validate_index_fields(
287    fields: &BTreeMap<String, FieldType>,
288    indexes: &[&IndexModel],
289) -> Result<(), ValidateError> {
290    let mut seen_names = BTreeSet::new();
291    for index in indexes {
292        if seen_names.contains(index.name) {
293            return Err(ValidateError::DuplicateIndexName {
294                name: index.name.to_string(),
295            });
296        }
297        seen_names.insert(index.name);
298
299        let mut seen = BTreeSet::new();
300        for field in index.fields {
301            if !fields.contains_key(*field) {
302                return Err(ValidateError::IndexFieldUnknown {
303                    index: **index,
304                    field: (*field).to_string(),
305                });
306            }
307            if seen.contains(*field) {
308                return Err(ValidateError::IndexFieldDuplicate {
309                    index: **index,
310                    field: (*field).to_string(),
311                });
312            }
313            seen.insert(*field);
314
315            let field_type = fields
316                .get(*field)
317                .expect("index field existence checked above");
318            // Guardrail: map fields are deterministic stored values but remain
319            // non-queryable and non-indexable in 0.7.
320            if matches!(field_type, FieldType::Map { .. }) {
321                return Err(ValidateError::IndexFieldMapNotQueryable {
322                    index: **index,
323                    field: (*field).to_string(),
324                });
325            }
326            if !field_type.value_kind().is_queryable() {
327                return Err(ValidateError::IndexFieldNotQueryable {
328                    index: **index,
329                    field: (*field).to_string(),
330                });
331            }
332        }
333    }
334
335    Ok(())
336}
337
338///
339/// SchemaInfo
340///
341/// Lightweight, runtime-usable field-type map for one entity.
342/// This is the *only* schema surface the predicate validator depends on.
343///
344
345#[derive(Clone, Debug)]
346pub(crate) struct SchemaInfo {
347    fields: BTreeMap<String, FieldType>,
348    field_kinds: BTreeMap<String, FieldKind>,
349}
350
351impl SchemaInfo {
352    #[must_use]
353    pub(crate) fn field(&self, name: &str) -> Option<&FieldType> {
354        self.fields.get(name)
355    }
356
357    #[must_use]
358    pub(crate) fn field_kind(&self, name: &str) -> Option<&FieldKind> {
359        self.field_kinds.get(name)
360    }
361
362    /// Builds runtime predicate schema information from an entity model.
363    pub(crate) fn from_entity_model(model: &EntityModel) -> Result<Self, ValidateError> {
364        // Validate identity constraints before building schema maps.
365        let entity_name = EntityName::try_from_str(model.entity_name).map_err(|err| {
366            ValidateError::InvalidEntityName {
367                name: model.entity_name.to_string(),
368                source: err,
369            }
370        })?;
371
372        if !model
373            .fields
374            .iter()
375            .any(|field| std::ptr::eq(field, model.primary_key))
376        {
377            return Err(ValidateError::InvalidPrimaryKey {
378                field: model.primary_key.name.to_string(),
379            });
380        }
381
382        let mut fields = BTreeMap::new();
383        let mut field_kinds = BTreeMap::new();
384        for field in model.fields {
385            if fields.contains_key(field.name) {
386                return Err(ValidateError::DuplicateField {
387                    field: field.name.to_string(),
388                });
389            }
390            let ty = field_type_from_model_kind(&field.kind);
391            fields.insert(field.name.to_string(), ty);
392            field_kinds.insert(field.name.to_string(), field.kind);
393        }
394
395        let pk_field_type = fields
396            .get(model.primary_key.name)
397            .expect("primary key verified above");
398        if !pk_field_type.is_keyable() {
399            return Err(ValidateError::InvalidPrimaryKeyType {
400                field: model.primary_key.name.to_string(),
401            });
402        }
403
404        validate_index_fields(&fields, model.indexes)?;
405        for index in model.indexes {
406            IndexName::try_from_parts(&entity_name, index.fields).map_err(|err| {
407                ValidateError::InvalidIndexName {
408                    index: **index,
409                    source: err,
410                }
411            })?;
412        }
413
414        Ok(Self {
415            fields,
416            field_kinds,
417        })
418    }
419}
420
421/// Predicate/schema validation failures, including invalid model contracts.
422#[derive(Debug, thiserror::Error)]
423pub enum ValidateError {
424    #[error("invalid entity name '{name}': {source}")]
425    InvalidEntityName {
426        name: String,
427        #[source]
428        source: EntityNameError,
429    },
430
431    #[error("invalid index name for '{index}': {source}")]
432    InvalidIndexName {
433        index: IndexModel,
434        #[source]
435        source: IndexNameError,
436    },
437
438    #[error("unknown field '{field}'")]
439    UnknownField { field: String },
440
441    #[error("field '{field}' is not queryable")]
442    NonQueryableFieldType { field: String },
443
444    #[error("duplicate field '{field}'")]
445    DuplicateField { field: String },
446
447    #[error("{0}")]
448    UnsupportedQueryFeature(#[from] UnsupportedQueryFeature),
449
450    #[error("primary key '{field}' not present in entity fields")]
451    InvalidPrimaryKey { field: String },
452
453    #[error("primary key '{field}' has a non-keyable type")]
454    InvalidPrimaryKeyType { field: String },
455
456    #[error("index '{index}' references unknown field '{field}'")]
457    IndexFieldUnknown { index: IndexModel, field: String },
458
459    #[error("index '{index}' references non-queryable field '{field}'")]
460    IndexFieldNotQueryable { index: IndexModel, field: String },
461
462    #[error(
463        "index '{index}' references map field '{field}'; map fields are not queryable in icydb 0.7"
464    )]
465    IndexFieldMapNotQueryable { index: IndexModel, field: String },
466
467    #[error("index '{index}' repeats field '{field}'")]
468    IndexFieldDuplicate { index: IndexModel, field: String },
469
470    #[error("duplicate index name '{name}'")]
471    DuplicateIndexName { name: String },
472
473    #[error("operator {op} is not valid for field '{field}'")]
474    InvalidOperator { field: String, op: String },
475
476    #[error("coercion {coercion:?} is not valid for field '{field}'")]
477    InvalidCoercion { field: String, coercion: CoercionId },
478
479    #[error("invalid literal for field '{field}': {message}")]
480    InvalidLiteral { field: String, message: String },
481}
482
483impl ValidateError {
484    pub(crate) fn invalid_operator(field: &str, op: impl fmt::Display) -> Self {
485        Self::InvalidOperator {
486            field: field.to_string(),
487            op: op.to_string(),
488        }
489    }
490
491    pub(crate) fn invalid_literal(field: &str, msg: &str) -> Self {
492        Self::InvalidLiteral {
493            field: field.to_string(),
494            message: msg.to_string(),
495        }
496    }
497}
498
499/// Reject policy-level non-queryable features before planning.
500pub(crate) fn reject_unsupported_query_features(
501    predicate: &Predicate,
502) -> Result<(), UnsupportedQueryFeature> {
503    match predicate {
504        Predicate::True
505        | Predicate::False
506        | Predicate::Compare(_)
507        | Predicate::IsNull { .. }
508        | Predicate::IsMissing { .. }
509        | Predicate::IsEmpty { .. }
510        | Predicate::IsNotEmpty { .. }
511        | Predicate::TextContains { .. }
512        | Predicate::TextContainsCi { .. } => Ok(()),
513        Predicate::And(children) | Predicate::Or(children) => {
514            for child in children {
515                reject_unsupported_query_features(child)?;
516            }
517
518            Ok(())
519        }
520        Predicate::Not(inner) => reject_unsupported_query_features(inner),
521    }
522}
523
524/// Validates a predicate against the provided schema information.
525pub(crate) fn validate(schema: &SchemaInfo, predicate: &Predicate) -> Result<(), ValidateError> {
526    reject_unsupported_query_features(predicate)?;
527
528    match predicate {
529        Predicate::True | Predicate::False => Ok(()),
530        Predicate::And(children) | Predicate::Or(children) => {
531            for child in children {
532                validate(schema, child)?;
533            }
534            Ok(())
535        }
536        Predicate::Not(inner) => validate(schema, inner),
537        Predicate::Compare(cmp) => validate_compare(schema, cmp),
538        Predicate::IsNull { field } | Predicate::IsMissing { field } => {
539            let _field_type = ensure_field(schema, field)?;
540            Ok(())
541        }
542        Predicate::IsEmpty { field } => {
543            let field_type = ensure_field(schema, field)?;
544            if field_type.is_text() || field_type.is_collection() {
545                Ok(())
546            } else {
547                Err(ValidateError::invalid_operator(field, "is_empty"))
548            }
549        }
550        Predicate::IsNotEmpty { field } => {
551            let field_type = ensure_field(schema, field)?;
552            if field_type.is_text() || field_type.is_collection() {
553                Ok(())
554            } else {
555                Err(ValidateError::invalid_operator(field, "is_not_empty"))
556            }
557        }
558        Predicate::TextContains { field, value } => {
559            validate_text_contains(schema, field, value, "text_contains")
560        }
561        Predicate::TextContainsCi { field, value } => {
562            validate_text_contains(schema, field, value, "text_contains_ci")
563        }
564    }
565}
566
567fn validate_compare(schema: &SchemaInfo, cmp: &ComparePredicate) -> Result<(), ValidateError> {
568    let field_type = ensure_field(schema, &cmp.field)?;
569
570    match cmp.op {
571        CompareOp::Eq | CompareOp::Ne => {
572            validate_eq_ne(&cmp.field, field_type, &cmp.value, &cmp.coercion)
573        }
574        CompareOp::Lt | CompareOp::Lte | CompareOp::Gt | CompareOp::Gte => {
575            validate_ordering(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
576        }
577        CompareOp::In | CompareOp::NotIn => {
578            validate_in(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
579        }
580        CompareOp::Contains => validate_contains(&cmp.field, field_type, &cmp.value, &cmp.coercion),
581        CompareOp::StartsWith | CompareOp::EndsWith => {
582            validate_text_compare(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
583        }
584    }
585}
586
587fn validate_eq_ne(
588    field: &str,
589    field_type: &FieldType,
590    value: &Value,
591    coercion: &CoercionSpec,
592) -> Result<(), ValidateError> {
593    if field_type.is_list_like() {
594        ensure_list_literal(field, value, field_type)?;
595    } else {
596        ensure_scalar_literal(field, value)?;
597    }
598
599    ensure_coercion(field, field_type, value, coercion)
600}
601
602fn validate_ordering(
603    field: &str,
604    field_type: &FieldType,
605    value: &Value,
606    coercion: &CoercionSpec,
607    op: CompareOp,
608) -> Result<(), ValidateError> {
609    if matches!(coercion.id, CoercionId::CollectionElement) {
610        return Err(ValidateError::InvalidCoercion {
611            field: field.to_string(),
612            coercion: coercion.id,
613        });
614    }
615
616    if !field_type.is_orderable() {
617        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
618    }
619
620    ensure_scalar_literal(field, value)?;
621
622    ensure_coercion(field, field_type, value, coercion)
623}
624
625/// Validate list membership predicates.
626fn validate_in(
627    field: &str,
628    field_type: &FieldType,
629    value: &Value,
630    coercion: &CoercionSpec,
631    op: CompareOp,
632) -> Result<(), ValidateError> {
633    if field_type.is_collection() {
634        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
635    }
636
637    let Value::List(items) = value else {
638        return Err(ValidateError::invalid_literal(
639            field,
640            "expected list literal",
641        ));
642    };
643
644    for item in items {
645        ensure_coercion(field, field_type, item, coercion)?;
646    }
647
648    Ok(())
649}
650
651/// Validate collection containment predicates on list/set fields.
652fn validate_contains(
653    field: &str,
654    field_type: &FieldType,
655    value: &Value,
656    coercion: &CoercionSpec,
657) -> Result<(), ValidateError> {
658    if field_type.is_text() {
659        // CONTRACT: text substring matching uses TextContains/TextContainsCi only.
660        return Err(ValidateError::invalid_operator(
661            field,
662            format!("{:?}", CompareOp::Contains),
663        ));
664    }
665
666    let element_type = match field_type {
667        FieldType::List(inner) | FieldType::Set(inner) => inner.as_ref(),
668        _ => {
669            return Err(ValidateError::invalid_operator(
670                field,
671                format!("{:?}", CompareOp::Contains),
672            ));
673        }
674    };
675
676    if matches!(coercion.id, CoercionId::TextCasefold) {
677        // CONTRACT: case-insensitive coercion never applies to structured values.
678        return Err(ValidateError::InvalidCoercion {
679            field: field.to_string(),
680            coercion: coercion.id,
681        });
682    }
683
684    ensure_coercion(field, element_type, value, coercion)
685}
686
687/// Validate text prefix/suffix comparisons.
688fn validate_text_compare(
689    field: &str,
690    field_type: &FieldType,
691    value: &Value,
692    coercion: &CoercionSpec,
693    op: CompareOp,
694) -> Result<(), ValidateError> {
695    if !field_type.is_text() {
696        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
697    }
698
699    ensure_text_literal(field, value)?;
700
701    ensure_coercion(field, field_type, value, coercion)
702}
703
704/// Validate substring predicates on text fields.
705fn validate_text_contains(
706    schema: &SchemaInfo,
707    field: &str,
708    value: &Value,
709    op: &str,
710) -> Result<(), ValidateError> {
711    let field_type = ensure_field(schema, field)?;
712    if !field_type.is_text() {
713        return Err(ValidateError::invalid_operator(field, op));
714    }
715
716    ensure_text_literal(field, value)?;
717
718    Ok(())
719}
720
721fn ensure_field<'a>(schema: &'a SchemaInfo, field: &str) -> Result<&'a FieldType, ValidateError> {
722    let field_type = schema
723        .field(field)
724        .ok_or_else(|| ValidateError::UnknownField {
725            field: field.to_string(),
726        })?;
727
728    if matches!(field_type, FieldType::Map { .. }) {
729        return Err(UnsupportedQueryFeature::MapPredicate {
730            field: field.to_string(),
731        }
732        .into());
733    }
734
735    if !field_type.value_kind().is_queryable() {
736        return Err(ValidateError::NonQueryableFieldType {
737            field: field.to_string(),
738        });
739    }
740
741    Ok(field_type)
742}
743
744// Ensure the literal is text to match text-only operators.
745fn ensure_text_literal(field: &str, value: &Value) -> Result<(), ValidateError> {
746    if !matches!(value, Value::Text(_)) {
747        return Err(ValidateError::invalid_literal(
748            field,
749            "expected text literal",
750        ));
751    }
752
753    Ok(())
754}
755
756// Reject list literals when scalar comparisons are required.
757fn ensure_scalar_literal(field: &str, value: &Value) -> Result<(), ValidateError> {
758    if matches!(value, Value::List(_)) {
759        return Err(ValidateError::invalid_literal(
760            field,
761            "expected scalar literal",
762        ));
763    }
764
765    Ok(())
766}
767
768fn ensure_coercion(
769    field: &str,
770    field_type: &FieldType,
771    literal: &Value,
772    coercion: &CoercionSpec,
773) -> Result<(), ValidateError> {
774    if matches!(coercion.id, CoercionId::TextCasefold) && !field_type.is_text() {
775        // CONTRACT: case-insensitive coercions are text-only.
776        return Err(ValidateError::InvalidCoercion {
777            field: field.to_string(),
778            coercion: coercion.id,
779        });
780    }
781
782    // NOTE:
783    // NumericWiden eligibility is registry-authoritative.
784    // CoercionFamily::Numeric is intentionally NOT sufficient.
785    // This prevents validation/runtime divergence for Date, IntBig, UintBig.
786    if matches!(coercion.id, CoercionId::NumericWiden)
787        && (!field_type.supports_numeric_coercion() || !literal.supports_numeric_coercion())
788    {
789        return Err(ValidateError::InvalidCoercion {
790            field: field.to_string(),
791            coercion: coercion.id,
792        });
793    }
794
795    if !matches!(coercion.id, CoercionId::NumericWiden) {
796        let left_family =
797            field_type
798                .coercion_family()
799                .ok_or_else(|| ValidateError::NonQueryableFieldType {
800                    field: field.to_string(),
801                })?;
802        let right_family = literal.coercion_family();
803
804        if !supports_coercion(left_family, right_family, coercion.id) {
805            return Err(ValidateError::InvalidCoercion {
806                field: field.to_string(),
807                coercion: coercion.id,
808            });
809        }
810    }
811
812    if matches!(
813        coercion.id,
814        CoercionId::Strict | CoercionId::CollectionElement
815    ) && !literal_matches_type(literal, field_type)
816    {
817        return Err(ValidateError::invalid_literal(
818            field,
819            "literal type does not match field type",
820        ));
821    }
822
823    Ok(())
824}
825
826fn ensure_list_literal(
827    field: &str,
828    literal: &Value,
829    field_type: &FieldType,
830) -> Result<(), ValidateError> {
831    if !literal_matches_type(literal, field_type) {
832        return Err(ValidateError::invalid_literal(
833            field,
834            "list literal does not match field element type",
835        ));
836    }
837
838    Ok(())
839}