Skip to main content

icydb_core/db/predicate/
schema.rs

1use crate::{
2    db::{
3        identity::{EntityName, EntityNameError, IndexName, IndexNameError},
4        predicate::{
5            CoercionId, CoercionSpec, CompareOp, ComparePredicate, Predicate,
6            model::UnsupportedQueryFeature, supports_coercion,
7        },
8    },
9    model::{entity::EntityModel, field::FieldKind, index::IndexModel},
10    traits::FieldValueKind,
11    value::{CoercionFamily, CoercionFamilyExt, Value},
12};
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt;
15
16///
17/// ScalarType
18///
19/// Internal scalar classification used by predicate validation.
20/// This is deliberately *smaller* than the full schema/type system
21/// and exists only to support:
22/// - coercion rules
23/// - literal compatibility checks
24/// - operator validity (ordering, equality)
25///
26
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub(crate) enum ScalarType {
29    Account,
30    Blob,
31    Bool,
32    Date,
33    Decimal,
34    Duration,
35    Enum,
36    Float32,
37    Float64,
38    Int,
39    Int128,
40    IntBig,
41    Principal,
42    Subaccount,
43    Text,
44    Timestamp,
45    Uint,
46    Uint128,
47    UintBig,
48    Ulid,
49    Unit,
50}
51
52// Local helpers to expand the scalar registry into match arms.
53macro_rules! scalar_coercion_family_from_registry {
54    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
55        match $self {
56            $( ScalarType::$scalar => $coercion_family, )*
57        }
58    };
59}
60
61macro_rules! scalar_matches_value_from_registry {
62    ( @args $self:expr, $value:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
63        matches!(
64            ($self, $value),
65            $( (ScalarType::$scalar, $value_pat) )|*
66        )
67    };
68}
69
70macro_rules! scalar_supports_numeric_coercion_from_registry {
71    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
72        match $self {
73            $( ScalarType::$scalar => $supports_numeric_coercion, )*
74        }
75    };
76}
77
78macro_rules! scalar_is_keyable_from_registry {
79    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
80        match $self {
81            $( ScalarType::$scalar => $is_keyable, )*
82        }
83    };
84}
85
86macro_rules! scalar_supports_ordering_from_registry {
87    ( @args $self:expr; @entries $( ($scalar:ident, $coercion_family:expr, $value_pat:pat, is_numeric_value = $is_numeric:expr, supports_numeric_coercion = $supports_numeric_coercion:expr, supports_arithmetic = $supports_arithmetic:expr, supports_equality = $supports_equality:expr, supports_ordering = $supports_ordering:expr, is_keyable = $is_keyable:expr, is_storage_key_encodable = $is_storage_key_encodable:expr) ),* $(,)? ) => {
88        match $self {
89            $( ScalarType::$scalar => $supports_ordering, )*
90        }
91    };
92}
93
94impl ScalarType {
95    #[must_use]
96    pub(crate) const fn coercion_family(&self) -> CoercionFamily {
97        scalar_registry!(scalar_coercion_family_from_registry, self)
98    }
99
100    #[must_use]
101    pub(crate) const fn is_orderable(&self) -> bool {
102        // Predicate-level ordering gate.
103        // Delegates to registry-backed supports_ordering.
104        self.supports_ordering()
105    }
106
107    #[must_use]
108    pub(crate) const fn matches_value(&self, value: &Value) -> bool {
109        scalar_registry!(scalar_matches_value_from_registry, self, value)
110    }
111
112    #[must_use]
113    pub(crate) const fn supports_numeric_coercion(&self) -> bool {
114        scalar_registry!(scalar_supports_numeric_coercion_from_registry, self)
115    }
116
117    #[must_use]
118    pub(crate) const fn is_keyable(&self) -> bool {
119        scalar_registry!(scalar_is_keyable_from_registry, self)
120    }
121
122    #[must_use]
123    pub(crate) const fn supports_ordering(&self) -> bool {
124        scalar_registry!(scalar_supports_ordering_from_registry, self)
125    }
126}
127
128///
129/// FieldType
130///
131/// Reduced runtime type representation used exclusively for predicate validation.
132/// This intentionally drops:
133/// - record structure
134/// - tuple structure
135/// - validator/sanitizer metadata
136///
137
138#[derive(Clone, Debug, Eq, PartialEq)]
139pub(crate) enum FieldType {
140    Scalar(ScalarType),
141    List(Box<Self>),
142    Set(Box<Self>),
143    Map { key: Box<Self>, value: Box<Self> },
144    Structured { queryable: bool },
145}
146
147impl FieldType {
148    #[must_use]
149    pub(crate) const fn value_kind(&self) -> FieldValueKind {
150        match self {
151            Self::Scalar(_) => FieldValueKind::Atomic,
152            Self::List(_) | Self::Set(_) => FieldValueKind::Structured { queryable: true },
153            Self::Map { .. } => FieldValueKind::Structured { queryable: false },
154            Self::Structured { queryable } => FieldValueKind::Structured {
155                queryable: *queryable,
156            },
157        }
158    }
159
160    #[must_use]
161    pub(crate) const fn coercion_family(&self) -> Option<CoercionFamily> {
162        match self {
163            Self::Scalar(inner) => Some(inner.coercion_family()),
164            Self::List(_) | Self::Set(_) | Self::Map { .. } => Some(CoercionFamily::Collection),
165            Self::Structured { .. } => None,
166        }
167    }
168
169    #[must_use]
170    pub(crate) const fn is_text(&self) -> bool {
171        matches!(self, Self::Scalar(ScalarType::Text))
172    }
173
174    #[must_use]
175    pub(crate) const fn is_collection(&self) -> bool {
176        matches!(self, Self::List(_) | Self::Set(_) | Self::Map { .. })
177    }
178
179    #[must_use]
180    pub(crate) const fn is_list_like(&self) -> bool {
181        matches!(self, Self::List(_) | Self::Set(_))
182    }
183
184    #[must_use]
185    pub(crate) const fn is_orderable(&self) -> bool {
186        match self {
187            Self::Scalar(inner) => inner.is_orderable(),
188            _ => false,
189        }
190    }
191
192    #[must_use]
193    pub(crate) const fn is_keyable(&self) -> bool {
194        match self {
195            Self::Scalar(inner) => inner.is_keyable(),
196            _ => false,
197        }
198    }
199
200    #[must_use]
201    pub(crate) const fn supports_numeric_coercion(&self) -> bool {
202        match self {
203            Self::Scalar(inner) => inner.supports_numeric_coercion(),
204            _ => false,
205        }
206    }
207}
208
209pub(crate) fn literal_matches_type(literal: &Value, field_type: &FieldType) -> bool {
210    match field_type {
211        FieldType::Scalar(inner) => inner.matches_value(literal),
212        FieldType::List(element) | FieldType::Set(element) => match literal {
213            Value::List(items) => items.iter().all(|item| literal_matches_type(item, element)),
214            _ => false,
215        },
216        FieldType::Map { key, value } => match literal {
217            Value::Map(entries) => {
218                if Value::validate_map_entries(entries.as_slice()).is_err() {
219                    return false;
220                }
221
222                entries.iter().all(|(entry_key, entry_value)| {
223                    literal_matches_type(entry_key, key) && literal_matches_type(entry_value, value)
224                })
225            }
226            _ => false,
227        },
228        FieldType::Structured { .. } => {
229            // NOTE: non-queryable structured field types never match predicate literals.
230            false
231        }
232    }
233}
234
235pub(super) fn field_type_from_model_kind(kind: &FieldKind) -> FieldType {
236    match kind {
237        FieldKind::Account => FieldType::Scalar(ScalarType::Account),
238        FieldKind::Blob => FieldType::Scalar(ScalarType::Blob),
239        FieldKind::Bool => FieldType::Scalar(ScalarType::Bool),
240        FieldKind::Date => FieldType::Scalar(ScalarType::Date),
241        FieldKind::Decimal { .. } => FieldType::Scalar(ScalarType::Decimal),
242        FieldKind::Duration => FieldType::Scalar(ScalarType::Duration),
243        FieldKind::Enum { .. } => FieldType::Scalar(ScalarType::Enum),
244        FieldKind::Float32 => FieldType::Scalar(ScalarType::Float32),
245        FieldKind::Float64 => FieldType::Scalar(ScalarType::Float64),
246        FieldKind::Int => FieldType::Scalar(ScalarType::Int),
247        FieldKind::Int128 => FieldType::Scalar(ScalarType::Int128),
248        FieldKind::IntBig => FieldType::Scalar(ScalarType::IntBig),
249        FieldKind::Principal => FieldType::Scalar(ScalarType::Principal),
250        FieldKind::Subaccount => FieldType::Scalar(ScalarType::Subaccount),
251        FieldKind::Text => FieldType::Scalar(ScalarType::Text),
252        FieldKind::Timestamp => FieldType::Scalar(ScalarType::Timestamp),
253        FieldKind::Uint => FieldType::Scalar(ScalarType::Uint),
254        FieldKind::Uint128 => FieldType::Scalar(ScalarType::Uint128),
255        FieldKind::UintBig => FieldType::Scalar(ScalarType::UintBig),
256        FieldKind::Ulid => FieldType::Scalar(ScalarType::Ulid),
257        FieldKind::Unit => FieldType::Scalar(ScalarType::Unit),
258        FieldKind::Relation { key_kind, .. } => field_type_from_model_kind(key_kind),
259        FieldKind::List(inner) => FieldType::List(Box::new(field_type_from_model_kind(inner))),
260        FieldKind::Set(inner) => FieldType::Set(Box::new(field_type_from_model_kind(inner))),
261        FieldKind::Map { key, value } => FieldType::Map {
262            key: Box::new(field_type_from_model_kind(key)),
263            value: Box::new(field_type_from_model_kind(value)),
264        },
265        FieldKind::Structured { queryable } => FieldType::Structured {
266            queryable: *queryable,
267        },
268    }
269}
270
271impl fmt::Display for FieldType {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        match self {
274            Self::Scalar(inner) => write!(f, "{inner:?}"),
275            Self::List(inner) => write!(f, "List<{inner}>"),
276            Self::Set(inner) => write!(f, "Set<{inner}>"),
277            Self::Map { key, value } => write!(f, "Map<{key}, {value}>"),
278            Self::Structured { queryable } => {
279                write!(f, "Structured<queryable={queryable}>")
280            }
281        }
282    }
283}
284
285fn validate_index_fields(
286    fields: &BTreeMap<String, FieldType>,
287    indexes: &[&IndexModel],
288) -> Result<(), ValidateError> {
289    let mut seen_names = BTreeSet::new();
290    for index in indexes {
291        if seen_names.contains(index.name) {
292            return Err(ValidateError::DuplicateIndexName {
293                name: index.name.to_string(),
294            });
295        }
296        seen_names.insert(index.name);
297
298        let mut seen = BTreeSet::new();
299        for field in index.fields {
300            if !fields.contains_key(*field) {
301                return Err(ValidateError::IndexFieldUnknown {
302                    index: **index,
303                    field: (*field).to_string(),
304                });
305            }
306            if seen.contains(*field) {
307                return Err(ValidateError::IndexFieldDuplicate {
308                    index: **index,
309                    field: (*field).to_string(),
310                });
311            }
312            seen.insert(*field);
313
314            let field_type = fields
315                .get(*field)
316                .expect("index field existence checked above");
317            // Guardrail: map fields are deterministic stored values but remain
318            // non-queryable and non-indexable in 0.7.
319            if matches!(field_type, FieldType::Map { .. }) {
320                return Err(ValidateError::IndexFieldMapNotQueryable {
321                    index: **index,
322                    field: (*field).to_string(),
323                });
324            }
325            if !field_type.value_kind().is_queryable() {
326                return Err(ValidateError::IndexFieldNotQueryable {
327                    index: **index,
328                    field: (*field).to_string(),
329                });
330            }
331        }
332    }
333
334    Ok(())
335}
336
337///
338/// SchemaInfo
339///
340/// Lightweight, runtime-usable field-type map for one entity.
341/// This is the *only* schema surface the predicate validator depends on.
342///
343
344#[derive(Clone, Debug)]
345pub(crate) struct SchemaInfo {
346    fields: BTreeMap<String, FieldType>,
347    field_kinds: BTreeMap<String, FieldKind>,
348}
349
350impl SchemaInfo {
351    #[must_use]
352    pub(crate) fn field(&self, name: &str) -> Option<&FieldType> {
353        self.fields.get(name)
354    }
355
356    #[must_use]
357    pub(crate) fn field_kind(&self, name: &str) -> Option<&FieldKind> {
358        self.field_kinds.get(name)
359    }
360
361    /// Builds runtime predicate schema information from an entity model.
362    pub(crate) fn from_entity_model(model: &EntityModel) -> Result<Self, ValidateError> {
363        // Validate identity constraints before building schema maps.
364        let entity_name = EntityName::try_from_str(model.entity_name).map_err(|err| {
365            ValidateError::InvalidEntityName {
366                name: model.entity_name.to_string(),
367                source: err,
368            }
369        })?;
370
371        if !model
372            .fields
373            .iter()
374            .any(|field| std::ptr::eq(field, model.primary_key))
375        {
376            return Err(ValidateError::InvalidPrimaryKey {
377                field: model.primary_key.name.to_string(),
378            });
379        }
380
381        let mut fields = BTreeMap::new();
382        let mut field_kinds = BTreeMap::new();
383        for field in model.fields {
384            if fields.contains_key(field.name) {
385                return Err(ValidateError::DuplicateField {
386                    field: field.name.to_string(),
387                });
388            }
389            let ty = field_type_from_model_kind(&field.kind);
390            fields.insert(field.name.to_string(), ty);
391            field_kinds.insert(field.name.to_string(), field.kind);
392        }
393
394        let pk_field_type = fields
395            .get(model.primary_key.name)
396            .expect("primary key verified above");
397        if !pk_field_type.is_keyable() {
398            return Err(ValidateError::InvalidPrimaryKeyType {
399                field: model.primary_key.name.to_string(),
400            });
401        }
402
403        validate_index_fields(&fields, model.indexes)?;
404        for index in model.indexes {
405            IndexName::try_from_parts(&entity_name, index.fields).map_err(|err| {
406                ValidateError::InvalidIndexName {
407                    index: **index,
408                    source: err,
409                }
410            })?;
411        }
412
413        Ok(Self {
414            fields,
415            field_kinds,
416        })
417    }
418}
419
420/// Predicate/schema validation failures, including invalid model contracts.
421#[derive(Debug, thiserror::Error)]
422pub enum ValidateError {
423    #[error("invalid entity name '{name}': {source}")]
424    InvalidEntityName {
425        name: String,
426        #[source]
427        source: EntityNameError,
428    },
429
430    #[error("invalid index name for '{index}': {source}")]
431    InvalidIndexName {
432        index: IndexModel,
433        #[source]
434        source: IndexNameError,
435    },
436
437    #[error("unknown field '{field}'")]
438    UnknownField { field: String },
439
440    #[error("field '{field}' is not queryable")]
441    NonQueryableFieldType { field: String },
442
443    #[error("duplicate field '{field}'")]
444    DuplicateField { field: String },
445
446    #[error("{0}")]
447    UnsupportedQueryFeature(#[from] UnsupportedQueryFeature),
448
449    #[error("primary key '{field}' not present in entity fields")]
450    InvalidPrimaryKey { field: String },
451
452    #[error("primary key '{field}' has a non-keyable type")]
453    InvalidPrimaryKeyType { field: String },
454
455    #[error("index '{index}' references unknown field '{field}'")]
456    IndexFieldUnknown { index: IndexModel, field: String },
457
458    #[error("index '{index}' references non-queryable field '{field}'")]
459    IndexFieldNotQueryable { index: IndexModel, field: String },
460
461    #[error(
462        "index '{index}' references map field '{field}'; map fields are not queryable in icydb 0.7"
463    )]
464    IndexFieldMapNotQueryable { index: IndexModel, field: String },
465
466    #[error("index '{index}' repeats field '{field}'")]
467    IndexFieldDuplicate { index: IndexModel, field: String },
468
469    #[error("duplicate index name '{name}'")]
470    DuplicateIndexName { name: String },
471
472    #[error("operator {op} is not valid for field '{field}'")]
473    InvalidOperator { field: String, op: String },
474
475    #[error("coercion {coercion:?} is not valid for field '{field}'")]
476    InvalidCoercion { field: String, coercion: CoercionId },
477
478    #[error("invalid literal for field '{field}': {message}")]
479    InvalidLiteral { field: String, message: String },
480}
481
482impl ValidateError {
483    pub(crate) fn invalid_operator(field: &str, op: impl fmt::Display) -> Self {
484        Self::InvalidOperator {
485            field: field.to_string(),
486            op: op.to_string(),
487        }
488    }
489
490    pub(crate) fn invalid_literal(field: &str, msg: &str) -> Self {
491        Self::InvalidLiteral {
492            field: field.to_string(),
493            message: msg.to_string(),
494        }
495    }
496}
497
498/// Reject policy-level non-queryable features before planning.
499pub(crate) fn reject_unsupported_query_features(
500    predicate: &Predicate,
501) -> Result<(), UnsupportedQueryFeature> {
502    match predicate {
503        Predicate::True
504        | Predicate::False
505        | Predicate::Compare(_)
506        | Predicate::IsNull { .. }
507        | Predicate::IsMissing { .. }
508        | Predicate::IsEmpty { .. }
509        | Predicate::IsNotEmpty { .. }
510        | Predicate::TextContains { .. }
511        | Predicate::TextContainsCi { .. } => Ok(()),
512        Predicate::And(children) | Predicate::Or(children) => {
513            for child in children {
514                reject_unsupported_query_features(child)?;
515            }
516
517            Ok(())
518        }
519        Predicate::Not(inner) => reject_unsupported_query_features(inner),
520    }
521}
522
523/// Validates a predicate against the provided schema information.
524pub(crate) fn validate(schema: &SchemaInfo, predicate: &Predicate) -> Result<(), ValidateError> {
525    reject_unsupported_query_features(predicate)?;
526
527    match predicate {
528        Predicate::True | Predicate::False => Ok(()),
529        Predicate::And(children) | Predicate::Or(children) => {
530            for child in children {
531                validate(schema, child)?;
532            }
533            Ok(())
534        }
535        Predicate::Not(inner) => validate(schema, inner),
536        Predicate::Compare(cmp) => validate_compare(schema, cmp),
537        Predicate::IsNull { field } | Predicate::IsMissing { field } => {
538            let _field_type = ensure_field(schema, field)?;
539            Ok(())
540        }
541        Predicate::IsEmpty { field } => {
542            let field_type = ensure_field(schema, field)?;
543            if field_type.is_text() || field_type.is_collection() {
544                Ok(())
545            } else {
546                Err(ValidateError::invalid_operator(field, "is_empty"))
547            }
548        }
549        Predicate::IsNotEmpty { field } => {
550            let field_type = ensure_field(schema, field)?;
551            if field_type.is_text() || field_type.is_collection() {
552                Ok(())
553            } else {
554                Err(ValidateError::invalid_operator(field, "is_not_empty"))
555            }
556        }
557        Predicate::TextContains { field, value } => {
558            validate_text_contains(schema, field, value, "text_contains")
559        }
560        Predicate::TextContainsCi { field, value } => {
561            validate_text_contains(schema, field, value, "text_contains_ci")
562        }
563    }
564}
565
566fn validate_compare(schema: &SchemaInfo, cmp: &ComparePredicate) -> Result<(), ValidateError> {
567    let field_type = ensure_field(schema, &cmp.field)?;
568
569    match cmp.op {
570        CompareOp::Eq | CompareOp::Ne => {
571            validate_eq_ne(&cmp.field, field_type, &cmp.value, &cmp.coercion)
572        }
573        CompareOp::Lt | CompareOp::Lte | CompareOp::Gt | CompareOp::Gte => {
574            validate_ordering(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
575        }
576        CompareOp::In | CompareOp::NotIn => {
577            validate_in(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
578        }
579        CompareOp::Contains => validate_contains(&cmp.field, field_type, &cmp.value, &cmp.coercion),
580        CompareOp::StartsWith | CompareOp::EndsWith => {
581            validate_text_compare(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
582        }
583    }
584}
585
586fn validate_eq_ne(
587    field: &str,
588    field_type: &FieldType,
589    value: &Value,
590    coercion: &CoercionSpec,
591) -> Result<(), ValidateError> {
592    if field_type.is_list_like() {
593        ensure_list_literal(field, value, field_type)?;
594    } else {
595        ensure_scalar_literal(field, value)?;
596    }
597
598    ensure_coercion(field, field_type, value, coercion)
599}
600
601fn validate_ordering(
602    field: &str,
603    field_type: &FieldType,
604    value: &Value,
605    coercion: &CoercionSpec,
606    op: CompareOp,
607) -> Result<(), ValidateError> {
608    if matches!(coercion.id, CoercionId::CollectionElement) {
609        return Err(ValidateError::InvalidCoercion {
610            field: field.to_string(),
611            coercion: coercion.id,
612        });
613    }
614
615    if !field_type.is_orderable() {
616        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
617    }
618
619    ensure_scalar_literal(field, value)?;
620
621    ensure_coercion(field, field_type, value, coercion)
622}
623
624/// Validate list membership predicates.
625fn validate_in(
626    field: &str,
627    field_type: &FieldType,
628    value: &Value,
629    coercion: &CoercionSpec,
630    op: CompareOp,
631) -> Result<(), ValidateError> {
632    if field_type.is_collection() {
633        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
634    }
635
636    let Value::List(items) = value else {
637        return Err(ValidateError::invalid_literal(
638            field,
639            "expected list literal",
640        ));
641    };
642
643    for item in items {
644        ensure_coercion(field, field_type, item, coercion)?;
645    }
646
647    Ok(())
648}
649
650/// Validate collection containment predicates on list/set fields.
651fn validate_contains(
652    field: &str,
653    field_type: &FieldType,
654    value: &Value,
655    coercion: &CoercionSpec,
656) -> Result<(), ValidateError> {
657    if field_type.is_text() {
658        // CONTRACT: text substring matching uses TextContains/TextContainsCi only.
659        return Err(ValidateError::invalid_operator(
660            field,
661            format!("{:?}", CompareOp::Contains),
662        ));
663    }
664
665    let element_type = match field_type {
666        FieldType::List(inner) | FieldType::Set(inner) => inner.as_ref(),
667        _ => {
668            return Err(ValidateError::invalid_operator(
669                field,
670                format!("{:?}", CompareOp::Contains),
671            ));
672        }
673    };
674
675    if matches!(coercion.id, CoercionId::TextCasefold) {
676        // CONTRACT: case-insensitive coercion never applies to structured values.
677        return Err(ValidateError::InvalidCoercion {
678            field: field.to_string(),
679            coercion: coercion.id,
680        });
681    }
682
683    ensure_coercion(field, element_type, value, coercion)
684}
685
686/// Validate text prefix/suffix comparisons.
687fn validate_text_compare(
688    field: &str,
689    field_type: &FieldType,
690    value: &Value,
691    coercion: &CoercionSpec,
692    op: CompareOp,
693) -> Result<(), ValidateError> {
694    if !field_type.is_text() {
695        return Err(ValidateError::invalid_operator(field, format!("{op:?}")));
696    }
697
698    ensure_text_literal(field, value)?;
699
700    ensure_coercion(field, field_type, value, coercion)
701}
702
703/// Validate substring predicates on text fields.
704fn validate_text_contains(
705    schema: &SchemaInfo,
706    field: &str,
707    value: &Value,
708    op: &str,
709) -> Result<(), ValidateError> {
710    let field_type = ensure_field(schema, field)?;
711    if !field_type.is_text() {
712        return Err(ValidateError::invalid_operator(field, op));
713    }
714
715    ensure_text_literal(field, value)?;
716
717    Ok(())
718}
719
720fn ensure_field<'a>(schema: &'a SchemaInfo, field: &str) -> Result<&'a FieldType, ValidateError> {
721    let field_type = schema
722        .field(field)
723        .ok_or_else(|| ValidateError::UnknownField {
724            field: field.to_string(),
725        })?;
726
727    if matches!(field_type, FieldType::Map { .. }) {
728        return Err(UnsupportedQueryFeature::MapPredicate {
729            field: field.to_string(),
730        }
731        .into());
732    }
733
734    if !field_type.value_kind().is_queryable() {
735        return Err(ValidateError::NonQueryableFieldType {
736            field: field.to_string(),
737        });
738    }
739
740    Ok(field_type)
741}
742
743// Ensure the literal is text to match text-only operators.
744fn ensure_text_literal(field: &str, value: &Value) -> Result<(), ValidateError> {
745    if !matches!(value, Value::Text(_)) {
746        return Err(ValidateError::invalid_literal(
747            field,
748            "expected text literal",
749        ));
750    }
751
752    Ok(())
753}
754
755// Reject list literals when scalar comparisons are required.
756fn ensure_scalar_literal(field: &str, value: &Value) -> Result<(), ValidateError> {
757    if matches!(value, Value::List(_)) {
758        return Err(ValidateError::invalid_literal(
759            field,
760            "expected scalar literal",
761        ));
762    }
763
764    Ok(())
765}
766
767fn ensure_coercion(
768    field: &str,
769    field_type: &FieldType,
770    literal: &Value,
771    coercion: &CoercionSpec,
772) -> Result<(), ValidateError> {
773    if matches!(coercion.id, CoercionId::TextCasefold) && !field_type.is_text() {
774        // CONTRACT: case-insensitive coercions are text-only.
775        return Err(ValidateError::InvalidCoercion {
776            field: field.to_string(),
777            coercion: coercion.id,
778        });
779    }
780
781    // NOTE:
782    // NumericWiden eligibility is registry-authoritative.
783    // CoercionFamily::Numeric is intentionally NOT sufficient.
784    // This prevents validation/runtime divergence for Date, IntBig, UintBig.
785    if matches!(coercion.id, CoercionId::NumericWiden)
786        && (!field_type.supports_numeric_coercion() || !literal.supports_numeric_coercion())
787    {
788        return Err(ValidateError::InvalidCoercion {
789            field: field.to_string(),
790            coercion: coercion.id,
791        });
792    }
793
794    if !matches!(coercion.id, CoercionId::NumericWiden) {
795        let left_family =
796            field_type
797                .coercion_family()
798                .ok_or_else(|| ValidateError::NonQueryableFieldType {
799                    field: field.to_string(),
800                })?;
801        let right_family = literal.coercion_family();
802
803        if !supports_coercion(left_family, right_family, coercion.id) {
804            return Err(ValidateError::InvalidCoercion {
805                field: field.to_string(),
806                coercion: coercion.id,
807            });
808        }
809    }
810
811    if matches!(
812        coercion.id,
813        CoercionId::Strict | CoercionId::CollectionElement
814    ) && !literal_matches_type(literal, field_type)
815    {
816        return Err(ValidateError::invalid_literal(
817            field,
818            "literal type does not match field type",
819        ));
820    }
821
822    Ok(())
823}
824
825fn ensure_list_literal(
826    field: &str,
827    literal: &Value,
828    field_type: &FieldType,
829) -> Result<(), ValidateError> {
830    if !literal_matches_type(literal, field_type) {
831        return Err(ValidateError::invalid_literal(
832            field,
833            "list literal does not match field element type",
834        ));
835    }
836
837    Ok(())
838}