Skip to main content

schema_core/config/
projection.rs

1//! Projecting a self-describing schema into a fully-typed mapping — without a
2//! database.
3//!
4//! Every gap a thin config once left to the source is now stated in the schema:
5//! a column field carries its [`FlussoType`] and nullability,
6//! an aggregate its result type. So the mapping follows from the schema alone.
7//! The structural rules are unchanged from when the source derived them — a
8//! group is an `object`, a to-many join is a `nested` array, a `count` is a
9//! non-null `long`, a primary key is never null — they just no longer need a
10//! round-trip to ask.
11
12use crate::common::{ColumnName, GenericValue, IndexName};
13
14use super::{
15    Aggregate, AggregateOp, Column, ContentHash, Field, FieldSource, FlussoType, IndexMapping,
16    IndexSchema, Mapping, MappingType, Relation, ResolvedField,
17};
18
19impl IndexSchema {
20    /// Project this schema into its fully-typed [`IndexMapping`].
21    pub fn resolve(&self, index: IndexName) -> IndexMapping {
22        resolve_index(index, self)
23    }
24}
25
26fn resolve_index(index: IndexName, schema: &IndexSchema) -> IndexMapping {
27    IndexMapping {
28        index,
29        // Hash the parsed schema, not the file: structural changes (including a
30        // declared type) flip the hash; cosmetic file changes do not.
31        hash: ContentHash::of(schema),
32        fields: resolve_fields(&schema.fields, schema.primary_key.as_ref()),
33    }
34}
35
36/// Resolve a list of fields. `primary_key` is the root table's key while we are
37/// still on the root row (it passes through groups, which stay on the same row);
38/// it is `None` once we cross into a related table via a join.
39fn resolve_fields(fields: &[Field], primary_key: Option<&ColumnName>) -> Vec<ResolvedField> {
40    fields
41        .iter()
42        .map(|field| resolve_field(field, primary_key))
43        .collect()
44}
45
46fn resolve_field(field: &Field, primary_key: Option<&ColumnName>) -> ResolvedField {
47    let (child_fields, child_pk): (&[Field], Option<&ColumnName>) = match &field.source {
48        FieldSource::Relation(Relation::Join(join)) => (&join.fields, Some(&join.primary_key)),
49        FieldSource::Group(fields) => (fields, primary_key),
50        _ => (&[], primary_key),
51    };
52    let children = resolve_fields(child_fields, child_pk);
53
54    let (mapping_type, nullable, array) = type_and_nullability(field, primary_key);
55    let mapping = Mapping {
56        mapping_type,
57        extra: field.options.clone(),
58        map_values: map_value_type(field),
59        decimal: is_decimal(field),
60    };
61
62    ResolvedField {
63        name: field.field.clone(),
64        mapping,
65        nullable,
66        array,
67        children,
68    }
69}
70
71/// The value mapping type of a `map` field — `Some(values.opensearch())` for a
72/// column declared [`FlussoType::Map`], `None` for everything else. This is the
73/// only thing distinguishing a `map`'s `object` mapping from a plain one.
74fn map_value_type(field: &Field) -> Option<MappingType> {
75    match &field.source {
76        FieldSource::Column(Column {
77            ty: FlussoType::Map { values },
78            ..
79        }) => Some(values.opensearch()),
80        _ => None,
81    }
82}
83
84/// Whether this field's leaf type is [`FlussoType::Decimal`] — a PG
85/// `numeric`/`decimal` (or a decimal-typed `sum`/`min`/`max`/`ids` aggregate, or
86/// a decimal constant). It maps to OpenSearch `double` either way; this is what
87/// lets a typed binding offer a `Decimal` handle instead of `f64`.
88fn is_decimal(field: &Field) -> bool {
89    let ty_is_decimal = |ty: &FlussoType| matches!(ty, FlussoType::Decimal);
90    match &field.source {
91        FieldSource::Column(Column { ty, .. }) => ty_is_decimal(ty),
92        FieldSource::Constant(GenericValue::Decimal(_)) => true,
93        FieldSource::Relation(Relation::Aggregate(aggregate)) => match &aggregate.op {
94            AggregateOp::Sum(_) | AggregateOp::Min(_) | AggregateOp::Max(_) => {
95                aggregate.value_type.as_ref().is_some_and(ty_is_decimal)
96            }
97            AggregateOp::Ids { element_type } => ty_is_decimal(element_type),
98            AggregateOp::Count | AggregateOp::Avg(_) => false,
99        },
100        _ => false,
101    }
102}
103
104/// Returns `(mapping_type, nullable, array)`. `array` is true only for an `ids`
105/// aggregate, whose `mapping_type` is then the element type.
106fn type_and_nullability(
107    field: &Field,
108    primary_key: Option<&ColumnName>,
109) -> (MappingType, bool, bool) {
110    match &field.source {
111        FieldSource::Column(Column {
112            column,
113            ty,
114            nullable,
115            default,
116            ..
117        }) => {
118            let forced_non_null = primary_key == Some(column) || default.is_some();
119            (ty.opensearch(), *nullable && !forced_non_null, false)
120        }
121        FieldSource::Group(_) => (MappingType::Object, false, false),
122        FieldSource::Geo(geo) => (
123            MappingType::Other("geo_point".to_owned()),
124            geo.nullable,
125            false,
126        ),
127        FieldSource::Constant(value) => (
128            constant_mapping_type(value),
129            matches!(value, GenericValue::Null),
130            false,
131        ),
132        FieldSource::Relation(Relation::Join(join)) => {
133            let mapping_type = if join.kind.is_to_many() {
134                MappingType::Nested
135            } else {
136                MappingType::Object
137            };
138            (mapping_type, join.nullable, false)
139        }
140        FieldSource::Relation(Relation::Aggregate(aggregate)) => aggregate_type(aggregate),
141    }
142}
143
144fn aggregate_type(aggregate: &Aggregate) -> (MappingType, bool, bool) {
145    match &aggregate.op {
146        AggregateOp::Count => (MappingType::Long, false, false),
147        AggregateOp::Avg(_) => (MappingType::Double, true, false),
148        AggregateOp::Sum(_) | AggregateOp::Min(_) | AggregateOp::Max(_) => {
149            let mapping_type = aggregate
150                .value_type
151                .as_ref()
152                .map(|ty| ty.opensearch())
153                // Conversion requires a `value_type` for these ops; `double` is
154                // a defensive fallback that should never be reached.
155                .unwrap_or(MappingType::Double);
156            (mapping_type, true, false)
157        }
158        AggregateOp::Ids { element_type } => (element_type.opensearch(), false, true),
159    }
160}
161
162#[cfg(test)]
163mod tests;
164
165/// The mapping type a constant value's shape implies.
166fn constant_mapping_type(value: &GenericValue) -> MappingType {
167    match value {
168        GenericValue::Bool(_) => MappingType::Boolean,
169        GenericValue::SmallInt(_) => MappingType::Short,
170        GenericValue::Int(_) => MappingType::Integer,
171        GenericValue::BigInt(_) => MappingType::Long,
172        GenericValue::Float(_) => MappingType::Float,
173        GenericValue::Double(_) | GenericValue::Decimal(_) => MappingType::Double,
174        GenericValue::Date(_)
175        | GenericValue::Time(_)
176        | GenericValue::Timestamp(_)
177        | GenericValue::TimestampTz(_) => MappingType::Date,
178        GenericValue::Bytes(_) => MappingType::Other("binary".to_owned()),
179        GenericValue::Array(items) => items
180            .first()
181            .map(constant_mapping_type)
182            .unwrap_or(MappingType::Keyword),
183        GenericValue::Map(_) => MappingType::Object,
184        GenericValue::String(_) | GenericValue::Uuid(_) | GenericValue::Null => {
185            MappingType::Keyword
186        }
187    }
188}