Skip to main content

icydb_core/db/query/predicate/
validate.rs

1use crate::{
2    db::identity::{EntityName, EntityNameError, IndexName, IndexNameError},
3    model::{entity::EntityModel, field::EntityFieldKind, index::IndexModel},
4    value::{Value, ValueFamily, ValueFamilyExt},
5};
6use icydb_schema::{
7    node::{
8        Entity, Enum, Item, ItemTarget, List, Map, Newtype, Record, Schema, Set, Tuple,
9        Value as SValue,
10    },
11    types::{Cardinality, Primitive},
12};
13use std::{
14    collections::{BTreeMap, BTreeSet},
15    fmt,
16};
17
18use super::{
19    ast::{CompareOp, ComparePredicate, Predicate},
20    coercion::{CoercionId, CoercionSpec, supports_coercion},
21};
22
23#[cfg(test)]
24use std::cell::Cell;
25
26#[cfg(test)]
27thread_local! {
28    static SCHEMA_LOOKUP_CALLED: Cell<bool> = const { Cell::new(false) };
29}
30
31#[cfg(test)]
32pub(crate) fn reset_schema_lookup_called() {
33    SCHEMA_LOOKUP_CALLED.with(|flag| flag.set(false));
34}
35
36#[cfg(test)]
37pub(crate) fn schema_lookup_called() -> bool {
38    SCHEMA_LOOKUP_CALLED.with(Cell::get)
39}
40
41///
42/// ScalarType
43///
44/// Internal scalar classification used by predicate validation.
45/// This is deliberately *smaller* than the full schema/type system
46/// and exists only to support:
47/// - coercion rules
48/// - literal compatibility checks
49/// - operator validity (ordering, equality)
50///
51
52#[derive(Clone, Debug, Eq, PartialEq)]
53pub(crate) enum ScalarType {
54    Account,
55    Blob,
56    Bool,
57    Date,
58    Decimal,
59    Duration,
60    Enum,
61    E8s,
62    E18s,
63    Float32,
64    Float64,
65    Int,
66    Int128,
67    IntBig,
68    Principal,
69    Subaccount,
70    Text,
71    Timestamp,
72    Uint,
73    Uint128,
74    UintBig,
75    Ulid,
76    Unit,
77}
78
79impl ScalarType {
80    #[must_use]
81    pub const fn family(&self) -> ValueFamily {
82        match self {
83            Self::Text => ValueFamily::Textual,
84            Self::Ulid | Self::Principal | Self::Account => ValueFamily::Identifier,
85            Self::Enum => ValueFamily::Enum,
86            Self::Blob | Self::Subaccount => ValueFamily::Blob,
87            Self::Bool => ValueFamily::Bool,
88            Self::Unit => ValueFamily::Unit,
89            Self::Date
90            | Self::Decimal
91            | Self::Duration
92            | Self::E8s
93            | Self::E18s
94            | Self::Float32
95            | Self::Float64
96            | Self::Int
97            | Self::Int128
98            | Self::IntBig
99            | Self::Timestamp
100            | Self::Uint
101            | Self::Uint128
102            | Self::UintBig => ValueFamily::Numeric,
103        }
104    }
105
106    #[must_use]
107    pub const fn is_orderable(&self) -> bool {
108        !matches!(self, Self::Blob | Self::Unit)
109    }
110
111    #[must_use]
112    pub const fn matches_value(&self, value: &Value) -> bool {
113        matches!(
114            (self, value),
115            (Self::Account, Value::Account(_))
116                | (Self::Blob, Value::Blob(_))
117                | (Self::Bool, Value::Bool(_))
118                | (Self::Date, Value::Date(_))
119                | (Self::Decimal, Value::Decimal(_))
120                | (Self::Duration, Value::Duration(_))
121                | (Self::Enum, Value::Enum(_))
122                | (Self::E8s, Value::E8s(_))
123                | (Self::E18s, Value::E18s(_))
124                | (Self::Float32, Value::Float32(_))
125                | (Self::Float64, Value::Float64(_))
126                | (Self::Int, Value::Int(_))
127                | (Self::Int128, Value::Int128(_))
128                | (Self::IntBig, Value::IntBig(_))
129                | (Self::Principal, Value::Principal(_))
130                | (Self::Subaccount, Value::Subaccount(_))
131                | (Self::Text, Value::Text(_))
132                | (Self::Timestamp, Value::Timestamp(_))
133                | (Self::Uint, Value::Uint(_))
134                | (Self::Uint128, Value::Uint128(_))
135                | (Self::UintBig, Value::UintBig(_))
136                | (Self::Ulid, Value::Ulid(_))
137                | (Self::Unit, Value::Unit)
138        )
139    }
140}
141
142///
143/// FieldType
144///
145/// Reduced runtime type representation used exclusively for predicate validation.
146/// This intentionally drops:
147/// - record structure
148/// - tuple structure
149/// - validator/sanitizer metadata
150///
151
152#[derive(Clone, Debug, Eq, PartialEq)]
153pub(crate) enum FieldType {
154    Scalar(ScalarType),
155    List(Box<Self>),
156    Set(Box<Self>),
157    Map { key: Box<Self>, value: Box<Self> },
158    Unsupported,
159}
160
161impl FieldType {
162    #[must_use]
163    pub const fn family(&self) -> Option<ValueFamily> {
164        match self {
165            Self::Scalar(inner) => Some(inner.family()),
166            Self::List(_) | Self::Set(_) | Self::Map { .. } => Some(ValueFamily::Collection),
167            Self::Unsupported => None,
168        }
169    }
170
171    #[must_use]
172    pub const fn is_text(&self) -> bool {
173        matches!(self, Self::Scalar(ScalarType::Text))
174    }
175
176    #[must_use]
177    pub const fn is_collection(&self) -> bool {
178        matches!(self, Self::List(_) | Self::Set(_) | Self::Map { .. })
179    }
180
181    #[must_use]
182    pub const fn is_list_like(&self) -> bool {
183        matches!(self, Self::List(_) | Self::Set(_))
184    }
185
186    #[must_use]
187    pub const fn is_map(&self) -> bool {
188        matches!(self, Self::Map { .. })
189    }
190
191    #[must_use]
192    pub fn element_type(&self) -> Option<&Self> {
193        match self {
194            Self::List(inner) | Self::Set(inner) => Some(inner),
195            _ => None,
196        }
197    }
198
199    #[must_use]
200    pub fn map_types(&self) -> Option<(&Self, &Self)> {
201        match self {
202            Self::Map { key, value } => Some((key.as_ref(), value.as_ref())),
203            _ => None,
204        }
205    }
206
207    #[must_use]
208    pub const fn is_orderable(&self) -> bool {
209        match self {
210            Self::Scalar(inner) => inner.is_orderable(),
211            _ => false,
212        }
213    }
214
215    #[must_use]
216    pub const fn is_keyable(&self) -> bool {
217        matches!(
218            self,
219            Self::Scalar(
220                ScalarType::Account
221                    | ScalarType::Int
222                    | ScalarType::Principal
223                    | ScalarType::Subaccount
224                    | ScalarType::Timestamp
225                    | ScalarType::Uint
226                    | ScalarType::Ulid
227                    | ScalarType::Unit
228            )
229        )
230    }
231}
232
233fn validate_index_fields(
234    fields: &BTreeMap<String, FieldType>,
235    indexes: &[&IndexModel],
236) -> Result<(), ValidateError> {
237    let mut seen_names = BTreeSet::new();
238    for index in indexes {
239        if seen_names.contains(index.name) {
240            return Err(ValidateError::DuplicateIndexName {
241                name: index.name.to_string(),
242            });
243        }
244        seen_names.insert(index.name);
245
246        let mut seen = BTreeSet::new();
247        for field in index.fields {
248            if !fields.contains_key(*field) {
249                return Err(ValidateError::IndexFieldUnknown {
250                    index: **index,
251                    field: (*field).to_string(),
252                });
253            }
254            if seen.contains(*field) {
255                return Err(ValidateError::IndexFieldDuplicate {
256                    index: **index,
257                    field: (*field).to_string(),
258                });
259            }
260            seen.insert(*field);
261
262            let field_type = fields
263                .get(*field)
264                .expect("index field existence checked above");
265            // Indexing is hash-based across all Value variants; only Unsupported is rejected here.
266            // Collisions are detected during unique enforcement and lookups.
267            if matches!(field_type, FieldType::Unsupported) {
268                return Err(ValidateError::IndexFieldUnsupported {
269                    index: **index,
270                    field: (*field).to_string(),
271                });
272            }
273        }
274    }
275
276    Ok(())
277}
278
279///
280/// SchemaInfo
281///
282/// Lightweight, runtime-usable field-type map for one entity.
283/// This is the *only* schema surface the predicate validator depends on.
284///
285
286#[derive(Clone, Debug)]
287pub struct SchemaInfo {
288    fields: BTreeMap<String, FieldType>,
289}
290
291impl SchemaInfo {
292    #[must_use]
293    #[expect(dead_code)]
294    pub(crate) fn new(fields: impl IntoIterator<Item = (String, FieldType)>) -> Self {
295        Self {
296            fields: fields.into_iter().collect(),
297        }
298    }
299
300    #[must_use]
301    pub(crate) fn field(&self, name: &str) -> Option<&FieldType> {
302        self.fields.get(name)
303    }
304
305    #[must_use]
306    pub fn from_entity_schema(entity: &Entity, schema: &Schema) -> Self {
307        let fields = entity
308            .fields
309            .fields
310            .iter()
311            .map(|field| {
312                let ty = field_type_from_value(&field.value, schema);
313                (field.ident.to_string(), ty)
314            })
315            .collect::<BTreeMap<_, _>>();
316
317        Self { fields }
318    }
319
320    pub fn from_entity_model(model: &EntityModel) -> Result<Self, ValidateError> {
321        // Validate identity constraints before building schema maps.
322        let entity_name = EntityName::try_from_str(model.entity_name).map_err(|err| {
323            ValidateError::InvalidEntityName {
324                name: model.entity_name.to_string(),
325                source: err,
326            }
327        })?;
328
329        if !model
330            .fields
331            .iter()
332            .any(|field| std::ptr::eq(field, model.primary_key))
333        {
334            return Err(ValidateError::InvalidPrimaryKey {
335                field: model.primary_key.name.to_string(),
336            });
337        }
338
339        let mut fields = BTreeMap::new();
340        for field in model.fields {
341            if fields.contains_key(field.name) {
342                return Err(ValidateError::DuplicateField {
343                    field: field.name.to_string(),
344                });
345            }
346            let ty = field_type_from_model_kind(&field.kind);
347            fields.insert(field.name.to_string(), ty);
348        }
349
350        let pk_field_type = fields
351            .get(model.primary_key.name)
352            .expect("primary key verified above");
353        if !pk_field_type.is_keyable() {
354            return Err(ValidateError::InvalidPrimaryKeyType {
355                field: model.primary_key.name.to_string(),
356            });
357        }
358
359        validate_index_fields(&fields, model.indexes)?;
360        for index in model.indexes {
361            IndexName::try_from_parts(&entity_name, index.fields).map_err(|err| {
362                ValidateError::InvalidIndexName {
363                    index: **index,
364                    source: err,
365                }
366            })?;
367        }
368
369        Ok(Self { fields })
370    }
371}
372
373/// Predicate/schema validation failures, including invalid model contracts.
374#[derive(Debug, thiserror::Error)]
375pub enum ValidateError {
376    #[error("invalid entity name '{name}': {source}")]
377    InvalidEntityName {
378        name: String,
379        #[source]
380        source: EntityNameError,
381    },
382
383    #[error("invalid index name for '{index}': {source}")]
384    InvalidIndexName {
385        index: IndexModel,
386        #[source]
387        source: IndexNameError,
388    },
389
390    #[error("unknown field '{field}'")]
391    UnknownField { field: String },
392
393    #[error("unsupported field type for '{field}'")]
394    UnsupportedFieldType { field: String },
395
396    #[error("duplicate field '{field}'")]
397    DuplicateField { field: String },
398
399    #[error("primary key '{field}' not present in entity fields")]
400    InvalidPrimaryKey { field: String },
401
402    #[error("primary key '{field}' has an unsupported type")]
403    InvalidPrimaryKeyType { field: String },
404
405    #[error("index '{index}' references unknown field '{field}'")]
406    IndexFieldUnknown { index: IndexModel, field: String },
407
408    #[error("index '{index}' references unsupported field '{field}'")]
409    IndexFieldUnsupported { index: IndexModel, field: String },
410
411    #[error("index '{index}' repeats field '{field}'")]
412    IndexFieldDuplicate { index: IndexModel, field: String },
413
414    #[error("duplicate index name '{name}'")]
415    DuplicateIndexName { name: String },
416
417    #[error("operator {op} is not valid for field '{field}'")]
418    InvalidOperator { field: String, op: String },
419
420    #[error("coercion {coercion:?} is not valid for field '{field}'")]
421    InvalidCoercion { field: String, coercion: CoercionId },
422
423    #[error("invalid literal for field '{field}': {message}")]
424    InvalidLiteral { field: String, message: String },
425
426    #[error("schema unavailable: {0}")]
427    SchemaUnavailable(String),
428}
429
430pub fn validate(schema: &SchemaInfo, predicate: &Predicate) -> Result<(), ValidateError> {
431    match predicate {
432        Predicate::True | Predicate::False => Ok(()),
433        Predicate::And(children) | Predicate::Or(children) => {
434            for child in children {
435                validate(schema, child)?;
436            }
437            Ok(())
438        }
439        Predicate::Not(inner) => validate(schema, inner),
440        Predicate::Compare(cmp) => validate_compare(schema, cmp),
441        Predicate::IsNull { field } | Predicate::IsMissing { field } => {
442            ensure_field(schema, field).map(|_| ())
443        }
444        Predicate::IsEmpty { field } => {
445            let field_type = ensure_field(schema, field)?;
446            if field_type.is_text() || field_type.is_collection() {
447                Ok(())
448            } else {
449                Err(ValidateError::InvalidOperator {
450                    field: field.clone(),
451                    op: "is_empty".to_string(),
452                })
453            }
454        }
455        Predicate::IsNotEmpty { field } => {
456            let field_type = ensure_field(schema, field)?;
457            if field_type.is_text() || field_type.is_collection() {
458                Ok(())
459            } else {
460                Err(ValidateError::InvalidOperator {
461                    field: field.clone(),
462                    op: "is_not_empty".to_string(),
463                })
464            }
465        }
466        Predicate::MapContainsKey {
467            field,
468            key,
469            coercion,
470        } => validate_map_key(schema, field, key, coercion),
471        Predicate::MapContainsValue {
472            field,
473            value,
474            coercion,
475        } => validate_map_value(schema, field, value, coercion),
476        Predicate::MapContainsEntry {
477            field,
478            key,
479            value,
480            coercion,
481        } => validate_map_entry(schema, field, key, value, coercion),
482        Predicate::TextContains { field, value } => {
483            validate_text_contains(schema, field, value, "text_contains")
484        }
485        Predicate::TextContainsCi { field, value } => {
486            validate_text_contains(schema, field, value, "text_contains_ci")
487        }
488    }
489}
490
491pub fn validate_model(model: &EntityModel, predicate: &Predicate) -> Result<(), ValidateError> {
492    let schema = SchemaInfo::from_entity_model(model)?;
493    validate(&schema, predicate)
494}
495
496fn validate_compare(schema: &SchemaInfo, cmp: &ComparePredicate) -> Result<(), ValidateError> {
497    let field_type = ensure_field(schema, &cmp.field)?;
498
499    match cmp.op {
500        CompareOp::Eq | CompareOp::Ne => {
501            validate_eq_ne(&cmp.field, field_type, &cmp.value, &cmp.coercion)
502        }
503        CompareOp::Lt | CompareOp::Lte | CompareOp::Gt | CompareOp::Gte => {
504            validate_ordering(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
505        }
506        CompareOp::In | CompareOp::NotIn => {
507            validate_in(&cmp.field, field_type, &cmp.value, &cmp.coercion)
508        }
509        CompareOp::AnyIn | CompareOp::AllIn => {
510            validate_any_all_in(&cmp.field, field_type, &cmp.value, &cmp.coercion)
511        }
512        CompareOp::Contains => validate_contains(&cmp.field, field_type, &cmp.value, &cmp.coercion),
513        CompareOp::StartsWith | CompareOp::EndsWith => {
514            validate_text_op(&cmp.field, field_type, &cmp.value, &cmp.coercion, cmp.op)
515        }
516    }
517}
518
519fn validate_eq_ne(
520    field: &str,
521    field_type: &FieldType,
522    value: &Value,
523    coercion: &CoercionSpec,
524) -> Result<(), ValidateError> {
525    if field_type.is_list_like() {
526        ensure_list_literal(field, value, field_type)?;
527    } else if field_type.is_map() {
528        ensure_map_literal(field, value, field_type)?;
529    } else if matches!(value, Value::List(_)) {
530        return Err(ValidateError::InvalidLiteral {
531            field: field.to_string(),
532            message: "expected scalar literal".to_string(),
533        });
534    }
535
536    ensure_coercion(field, field_type, value, coercion)
537}
538
539fn validate_ordering(
540    field: &str,
541    field_type: &FieldType,
542    value: &Value,
543    coercion: &CoercionSpec,
544    op: CompareOp,
545) -> Result<(), ValidateError> {
546    if matches!(coercion.id, CoercionId::CollectionElement) {
547        return Err(ValidateError::InvalidCoercion {
548            field: field.to_string(),
549            coercion: coercion.id,
550        });
551    }
552
553    if !field_type.is_orderable() {
554        return Err(ValidateError::InvalidOperator {
555            field: field.to_string(),
556            op: format!("{op:?}"),
557        });
558    }
559
560    if matches!(value, Value::List(_)) {
561        return Err(ValidateError::InvalidLiteral {
562            field: field.to_string(),
563            message: "expected scalar literal".to_string(),
564        });
565    }
566
567    ensure_coercion(field, field_type, value, coercion)
568}
569
570fn validate_in(
571    field: &str,
572    field_type: &FieldType,
573    value: &Value,
574    coercion: &CoercionSpec,
575) -> Result<(), ValidateError> {
576    if field_type.is_collection() {
577        return Err(ValidateError::InvalidOperator {
578            field: field.to_string(),
579            op: format!("{:?}", CompareOp::In),
580        });
581    }
582
583    let Value::List(items) = value else {
584        return Err(ValidateError::InvalidLiteral {
585            field: field.to_string(),
586            message: "expected list literal".to_string(),
587        });
588    };
589
590    for item in items {
591        ensure_coercion(field, field_type, item, coercion)?;
592    }
593
594    Ok(())
595}
596
597fn validate_any_all_in(
598    field: &str,
599    field_type: &FieldType,
600    value: &Value,
601    coercion: &CoercionSpec,
602) -> Result<(), ValidateError> {
603    let element_type = field_type
604        .element_type()
605        .ok_or_else(|| ValidateError::InvalidOperator {
606            field: field.to_string(),
607            op: format!("{:?}", CompareOp::AnyIn),
608        })?;
609
610    let Value::List(items) = value else {
611        return Err(ValidateError::InvalidLiteral {
612            field: field.to_string(),
613            message: "expected list literal".to_string(),
614        });
615    };
616
617    for item in items {
618        ensure_coercion(field, element_type, item, coercion)?;
619    }
620
621    Ok(())
622}
623
624fn validate_contains(
625    field: &str,
626    field_type: &FieldType,
627    value: &Value,
628    coercion: &CoercionSpec,
629) -> Result<(), ValidateError> {
630    if field_type.is_text() {
631        if !matches!(coercion.id, CoercionId::Strict | CoercionId::TextCasefold) {
632            return Err(ValidateError::InvalidCoercion {
633                field: field.to_string(),
634                coercion: coercion.id,
635            });
636        }
637        if !matches!(value, Value::Text(_)) {
638            return Err(ValidateError::InvalidLiteral {
639                field: field.to_string(),
640                message: "expected text literal".to_string(),
641            });
642        }
643
644        return ensure_coercion(field, field_type, value, coercion);
645    }
646
647    let element_type = field_type
648        .element_type()
649        .ok_or_else(|| ValidateError::InvalidOperator {
650            field: field.to_string(),
651            op: format!("{:?}", CompareOp::Contains),
652        })?;
653
654    ensure_coercion(field, element_type, value, coercion)
655}
656
657fn validate_text_op(
658    field: &str,
659    field_type: &FieldType,
660    value: &Value,
661    coercion: &CoercionSpec,
662    op: CompareOp,
663) -> Result<(), ValidateError> {
664    if !field_type.is_text() {
665        return Err(ValidateError::InvalidOperator {
666            field: field.to_string(),
667            op: format!("{op:?}"),
668        });
669    }
670
671    if !matches!(coercion.id, CoercionId::Strict | CoercionId::TextCasefold) {
672        return Err(ValidateError::InvalidCoercion {
673            field: field.to_string(),
674            coercion: coercion.id,
675        });
676    }
677
678    if !matches!(value, Value::Text(_)) {
679        return Err(ValidateError::InvalidLiteral {
680            field: field.to_string(),
681            message: "expected text literal".to_string(),
682        });
683    }
684
685    ensure_coercion(field, field_type, value, coercion)
686}
687
688fn validate_map_key(
689    schema: &SchemaInfo,
690    field: &str,
691    key: &Value,
692    coercion: &CoercionSpec,
693) -> Result<(), ValidateError> {
694    let field_type = ensure_field(schema, field)?;
695    let (key_type, _) = field_type
696        .map_types()
697        .ok_or_else(|| ValidateError::InvalidOperator {
698            field: field.to_string(),
699            op: "map_contains_key".to_string(),
700        })?;
701
702    ensure_coercion(field, key_type, key, coercion)
703}
704
705fn validate_map_value(
706    schema: &SchemaInfo,
707    field: &str,
708    value: &Value,
709    coercion: &CoercionSpec,
710) -> Result<(), ValidateError> {
711    let field_type = ensure_field(schema, field)?;
712    let (_, value_type) = field_type
713        .map_types()
714        .ok_or_else(|| ValidateError::InvalidOperator {
715            field: field.to_string(),
716            op: "map_contains_value".to_string(),
717        })?;
718
719    ensure_coercion(field, value_type, value, coercion)
720}
721
722fn validate_map_entry(
723    schema: &SchemaInfo,
724    field: &str,
725    key: &Value,
726    value: &Value,
727    coercion: &CoercionSpec,
728) -> Result<(), ValidateError> {
729    let field_type = ensure_field(schema, field)?;
730    let (key_type, value_type) =
731        field_type
732            .map_types()
733            .ok_or_else(|| ValidateError::InvalidOperator {
734                field: field.to_string(),
735                op: "map_contains_entry".to_string(),
736            })?;
737
738    ensure_coercion(field, key_type, key, coercion)?;
739    ensure_coercion(field, value_type, value, coercion)?;
740
741    Ok(())
742}
743
744/// Validate substring predicates on text fields.
745fn validate_text_contains(
746    schema: &SchemaInfo,
747    field: &str,
748    value: &Value,
749    op: &str,
750) -> Result<(), ValidateError> {
751    let field_type = ensure_field(schema, field)?;
752    if !field_type.is_text() {
753        return Err(ValidateError::InvalidOperator {
754            field: field.to_string(),
755            op: op.to_string(),
756        });
757    }
758
759    if !matches!(value, Value::Text(_)) {
760        return Err(ValidateError::InvalidLiteral {
761            field: field.to_string(),
762            message: "expected text literal".to_string(),
763        });
764    }
765
766    Ok(())
767}
768
769fn ensure_field<'a>(schema: &'a SchemaInfo, field: &str) -> Result<&'a FieldType, ValidateError> {
770    let field_type = schema
771        .field(field)
772        .ok_or_else(|| ValidateError::UnknownField {
773            field: field.to_string(),
774        })?;
775
776    if matches!(field_type, FieldType::Unsupported) {
777        return Err(ValidateError::UnsupportedFieldType {
778            field: field.to_string(),
779        });
780    }
781
782    Ok(field_type)
783}
784
785fn ensure_coercion(
786    field: &str,
787    field_type: &FieldType,
788    literal: &Value,
789    coercion: &CoercionSpec,
790) -> Result<(), ValidateError> {
791    let left_family = field_type
792        .family()
793        .ok_or_else(|| ValidateError::UnsupportedFieldType {
794            field: field.to_string(),
795        })?;
796    let right_family = literal.family();
797
798    if !supports_coercion(left_family, right_family, coercion.id) {
799        return Err(ValidateError::InvalidCoercion {
800            field: field.to_string(),
801            coercion: coercion.id,
802        });
803    }
804
805    if matches!(
806        coercion.id,
807        CoercionId::Strict | CoercionId::CollectionElement
808    ) && !literal_matches_type(literal, field_type)
809    {
810        return Err(ValidateError::InvalidLiteral {
811            field: field.to_string(),
812            message: "literal type does not match field type".to_string(),
813        });
814    }
815
816    Ok(())
817}
818
819fn ensure_list_literal(
820    field: &str,
821    literal: &Value,
822    field_type: &FieldType,
823) -> Result<(), ValidateError> {
824    if !literal_matches_type(literal, field_type) {
825        return Err(ValidateError::InvalidLiteral {
826            field: field.to_string(),
827            message: "list literal does not match field element type".to_string(),
828        });
829    }
830
831    Ok(())
832}
833
834fn ensure_map_literal(
835    field: &str,
836    literal: &Value,
837    field_type: &FieldType,
838) -> Result<(), ValidateError> {
839    if !literal_matches_type(literal, field_type) {
840        return Err(ValidateError::InvalidLiteral {
841            field: field.to_string(),
842            message: "map literal does not match field key/value types".to_string(),
843        });
844    }
845
846    Ok(())
847}
848
849pub(crate) fn literal_matches_type(literal: &Value, field_type: &FieldType) -> bool {
850    match field_type {
851        FieldType::Scalar(inner) => inner.matches_value(literal),
852        FieldType::List(element) | FieldType::Set(element) => match literal {
853            Value::List(items) => items.iter().all(|item| literal_matches_type(item, element)),
854            _ => false,
855        },
856        FieldType::Map { key, value } => match literal {
857            Value::List(entries) => entries.iter().all(|entry| match entry {
858                Value::List(pair) if pair.len() == 2 => {
859                    literal_matches_type(&pair[0], key) && literal_matches_type(&pair[1], value)
860                }
861                _ => false,
862            }),
863            _ => false,
864        },
865        FieldType::Unsupported => false,
866    }
867}
868
869fn field_type_from_value(value: &SValue, schema: &Schema) -> FieldType {
870    let base = field_type_from_item(&value.item, schema);
871
872    match value.cardinality {
873        Cardinality::Many => FieldType::List(Box::new(base)),
874        Cardinality::One | Cardinality::Opt => base,
875    }
876}
877
878fn field_type_from_item(item: &Item, schema: &Schema) -> FieldType {
879    match &item.target {
880        ItemTarget::Primitive(prim) => FieldType::Scalar(scalar_from_primitive(*prim)),
881        ItemTarget::Is(path) => {
882            if schema.cast_node::<Enum>(path).is_ok() {
883                return FieldType::Scalar(ScalarType::Enum);
884            }
885            if let Ok(node) = schema.cast_node::<Newtype>(path) {
886                return field_type_from_item(&node.item, schema);
887            }
888            if let Ok(node) = schema.cast_node::<List>(path) {
889                return FieldType::List(Box::new(field_type_from_item(&node.item, schema)));
890            }
891            if let Ok(node) = schema.cast_node::<Set>(path) {
892                return FieldType::Set(Box::new(field_type_from_item(&node.item, schema)));
893            }
894            if let Ok(node) = schema.cast_node::<Map>(path) {
895                let key = field_type_from_item(&node.key, schema);
896                let value = field_type_from_value(&node.value, schema);
897                return FieldType::Map {
898                    key: Box::new(key),
899                    value: Box::new(value),
900                };
901            }
902            if schema.cast_node::<Record>(path).is_ok() {
903                return FieldType::Unsupported;
904            }
905            if schema.cast_node::<Tuple>(path).is_ok() {
906                return FieldType::Unsupported;
907            }
908
909            FieldType::Unsupported
910        }
911    }
912}
913
914const fn scalar_from_primitive(prim: Primitive) -> ScalarType {
915    match prim {
916        Primitive::Account => ScalarType::Account,
917        Primitive::Blob => ScalarType::Blob,
918        Primitive::Bool => ScalarType::Bool,
919        Primitive::Date => ScalarType::Date,
920        Primitive::Decimal => ScalarType::Decimal,
921        Primitive::Duration => ScalarType::Duration,
922        Primitive::E8s => ScalarType::E8s,
923        Primitive::E18s => ScalarType::E18s,
924        Primitive::Float32 => ScalarType::Float32,
925        Primitive::Float64 => ScalarType::Float64,
926        Primitive::Int => ScalarType::IntBig,
927        Primitive::Int8 | Primitive::Int16 | Primitive::Int32 | Primitive::Int64 => ScalarType::Int,
928        Primitive::Int128 => ScalarType::Int128,
929        Primitive::Nat => ScalarType::UintBig,
930        Primitive::Nat8 | Primitive::Nat16 | Primitive::Nat32 | Primitive::Nat64 => {
931            ScalarType::Uint
932        }
933        Primitive::Nat128 => ScalarType::Uint128,
934        Primitive::Principal => ScalarType::Principal,
935        Primitive::Subaccount => ScalarType::Subaccount,
936        Primitive::Text => ScalarType::Text,
937        Primitive::Timestamp => ScalarType::Timestamp,
938        Primitive::Ulid => ScalarType::Ulid,
939        Primitive::Unit => ScalarType::Unit,
940    }
941}
942
943fn field_type_from_model_kind(kind: &EntityFieldKind) -> FieldType {
944    match kind {
945        EntityFieldKind::Account => FieldType::Scalar(ScalarType::Account),
946        EntityFieldKind::Blob => FieldType::Scalar(ScalarType::Blob),
947        EntityFieldKind::Bool => FieldType::Scalar(ScalarType::Bool),
948        EntityFieldKind::Date => FieldType::Scalar(ScalarType::Date),
949        EntityFieldKind::Decimal => FieldType::Scalar(ScalarType::Decimal),
950        EntityFieldKind::Duration => FieldType::Scalar(ScalarType::Duration),
951        EntityFieldKind::Enum => FieldType::Scalar(ScalarType::Enum),
952        EntityFieldKind::E8s => FieldType::Scalar(ScalarType::E8s),
953        EntityFieldKind::E18s => FieldType::Scalar(ScalarType::E18s),
954        EntityFieldKind::Float32 => FieldType::Scalar(ScalarType::Float32),
955        EntityFieldKind::Float64 => FieldType::Scalar(ScalarType::Float64),
956        EntityFieldKind::Int => FieldType::Scalar(ScalarType::Int),
957        EntityFieldKind::Int128 => FieldType::Scalar(ScalarType::Int128),
958        EntityFieldKind::IntBig => FieldType::Scalar(ScalarType::IntBig),
959        EntityFieldKind::Principal => FieldType::Scalar(ScalarType::Principal),
960        EntityFieldKind::Subaccount => FieldType::Scalar(ScalarType::Subaccount),
961        EntityFieldKind::Text => FieldType::Scalar(ScalarType::Text),
962        EntityFieldKind::Timestamp => FieldType::Scalar(ScalarType::Timestamp),
963        EntityFieldKind::Uint => FieldType::Scalar(ScalarType::Uint),
964        EntityFieldKind::Uint128 => FieldType::Scalar(ScalarType::Uint128),
965        EntityFieldKind::UintBig => FieldType::Scalar(ScalarType::UintBig),
966        EntityFieldKind::Ulid => FieldType::Scalar(ScalarType::Ulid),
967        EntityFieldKind::Unit => FieldType::Scalar(ScalarType::Unit),
968        EntityFieldKind::List(inner) => {
969            FieldType::List(Box::new(field_type_from_model_kind(inner)))
970        }
971        EntityFieldKind::Set(inner) => FieldType::Set(Box::new(field_type_from_model_kind(inner))),
972        EntityFieldKind::Map { key, value } => FieldType::Map {
973            key: Box::new(field_type_from_model_kind(key)),
974            value: Box::new(field_type_from_model_kind(value)),
975        },
976        EntityFieldKind::Unsupported => FieldType::Unsupported,
977    }
978}
979
980impl fmt::Display for FieldType {
981    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
982        match self {
983            Self::Scalar(inner) => write!(f, "{inner:?}"),
984            Self::List(inner) => write!(f, "List<{inner}>"),
985            Self::Set(inner) => write!(f, "Set<{inner}>"),
986            Self::Map { key, value } => write!(f, "Map<{key}, {value}>"),
987            Self::Unsupported => write!(f, "Unsupported"),
988        }
989    }
990}
991
992///
993/// TESTS
994///
995
996#[cfg(test)]
997mod tests {
998    use super::{ValidateError, validate_model};
999    use crate::{
1000        db::query::{
1001            FieldRef,
1002            predicate::{CoercionId, Predicate},
1003        },
1004        model::{
1005            entity::EntityModel,
1006            field::{EntityFieldKind, EntityFieldModel},
1007            index::IndexModel,
1008        },
1009        types::Ulid,
1010    };
1011
1012    fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
1013        EntityFieldModel { name, kind }
1014    }
1015
1016    fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
1017        let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
1018        let primary_key = &fields[pk_index];
1019        let indexes: &'static [&'static IndexModel] = &[];
1020
1021        EntityModel {
1022            path: "test::Entity",
1023            entity_name: "TestEntity",
1024            primary_key,
1025            fields,
1026            indexes,
1027        }
1028    }
1029
1030    #[test]
1031    fn validate_model_accepts_scalars_and_coercions() {
1032        let model = model_with_fields(
1033            vec![
1034                field("id", EntityFieldKind::Ulid),
1035                field("email", EntityFieldKind::Text),
1036                field("age", EntityFieldKind::Uint),
1037                field("created_at", EntityFieldKind::Timestamp),
1038                field("active", EntityFieldKind::Bool),
1039            ],
1040            0,
1041        );
1042
1043        let predicate = Predicate::And(vec![
1044            FieldRef::new("id").eq(Ulid::nil()),
1045            FieldRef::new("email").eq_ci("User@example.com"),
1046            FieldRef::new("age").lt(30u32),
1047        ]);
1048
1049        assert!(validate_model(&model, &predicate).is_ok());
1050    }
1051
1052    #[test]
1053    fn validate_model_accepts_collections_and_map_contains() {
1054        let model = model_with_fields(
1055            vec![
1056                field("id", EntityFieldKind::Ulid),
1057                field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
1058                field(
1059                    "principals",
1060                    EntityFieldKind::Set(&EntityFieldKind::Principal),
1061                ),
1062                field(
1063                    "attributes",
1064                    EntityFieldKind::Map {
1065                        key: &EntityFieldKind::Text,
1066                        value: &EntityFieldKind::Uint,
1067                    },
1068                ),
1069            ],
1070            0,
1071        );
1072
1073        let predicate = Predicate::And(vec![
1074            FieldRef::new("tags").is_empty(),
1075            FieldRef::new("principals").is_not_empty(),
1076            FieldRef::new("attributes").map_contains_entry("k", 1u64, CoercionId::Strict),
1077        ]);
1078
1079        assert!(validate_model(&model, &predicate).is_ok());
1080
1081        let bad =
1082            FieldRef::new("attributes").map_contains_entry("k", 1u64, CoercionId::TextCasefold);
1083
1084        assert!(matches!(
1085            validate_model(&model, &bad),
1086            Err(ValidateError::InvalidCoercion { .. })
1087        ));
1088    }
1089
1090    #[test]
1091    fn validate_model_rejects_unsupported_fields() {
1092        let model = model_with_fields(
1093            vec![
1094                field("id", EntityFieldKind::Ulid),
1095                field("broken", EntityFieldKind::Unsupported),
1096            ],
1097            0,
1098        );
1099
1100        let predicate = FieldRef::new("broken").eq(1u64);
1101
1102        assert!(matches!(
1103            validate_model(&model, &predicate),
1104            Err(ValidateError::UnsupportedFieldType { field }) if field == "broken"
1105        ));
1106    }
1107
1108    #[test]
1109    fn validate_model_accepts_text_contains() {
1110        let model = model_with_fields(
1111            vec![
1112                field("id", EntityFieldKind::Ulid),
1113                field("email", EntityFieldKind::Text),
1114            ],
1115            0,
1116        );
1117
1118        let predicate = FieldRef::new("email").text_contains("example");
1119        assert!(validate_model(&model, &predicate).is_ok());
1120
1121        let predicate = FieldRef::new("email").text_contains_ci("EXAMPLE");
1122        assert!(validate_model(&model, &predicate).is_ok());
1123    }
1124
1125    #[test]
1126    fn validate_model_rejects_text_contains_on_non_text() {
1127        let model = model_with_fields(
1128            vec![
1129                field("id", EntityFieldKind::Ulid),
1130                field("age", EntityFieldKind::Uint),
1131            ],
1132            0,
1133        );
1134
1135        let predicate = FieldRef::new("age").text_contains("1");
1136        assert!(matches!(
1137            validate_model(&model, &predicate),
1138            Err(ValidateError::InvalidOperator { field, op })
1139                if field == "age" && op == "text_contains"
1140        ));
1141    }
1142}