1use crate::common::{ColumnName, GenericValue, IndexName};
13
14use super::{
15 Aggregate, AggregateOp, Column, ContentHash, Field, FieldSource, FlussoType, IndexMapping,
16 IndexSchema, Mapping, MappingType, Relation, ResolvedField,
17};
18
19impl IndexSchema {
20 pub fn resolve(&self, index: IndexName) -> IndexMapping {
22 resolve_index(index, self)
23 }
24}
25
26fn resolve_index(index: IndexName, schema: &IndexSchema) -> IndexMapping {
27 IndexMapping {
28 index,
29 hash: ContentHash::of(schema),
32 fields: resolve_fields(&schema.fields, schema.primary_key.as_ref()),
33 }
34}
35
36fn resolve_fields(fields: &[Field], primary_key: Option<&ColumnName>) -> Vec<ResolvedField> {
40 fields
41 .iter()
42 .map(|field| resolve_field(field, primary_key))
43 .collect()
44}
45
46fn resolve_field(field: &Field, primary_key: Option<&ColumnName>) -> ResolvedField {
47 let (child_fields, child_pk): (&[Field], Option<&ColumnName>) = match &field.source {
48 FieldSource::Relation(Relation::Join(join)) => (&join.fields, Some(&join.primary_key)),
49 FieldSource::Group(fields) => (fields, primary_key),
50 _ => (&[], primary_key),
51 };
52 let children = resolve_fields(child_fields, child_pk);
53
54 let (mapping_type, nullable, array) = type_and_nullability(field, primary_key);
55 let mapping = Mapping {
56 mapping_type,
57 extra: field.options.clone(),
58 map_values: map_value_type(field),
59 decimal: is_decimal(field),
60 };
61
62 ResolvedField {
63 name: field.field.clone(),
64 mapping,
65 nullable,
66 array,
67 children,
68 }
69}
70
71fn map_value_type(field: &Field) -> Option<MappingType> {
75 match &field.source {
76 FieldSource::Column(Column {
77 ty: FlussoType::Map { values },
78 ..
79 }) => Some(values.opensearch()),
80 _ => None,
81 }
82}
83
84fn is_decimal(field: &Field) -> bool {
89 let ty_is_decimal = |ty: &FlussoType| matches!(ty, FlussoType::Decimal);
90 match &field.source {
91 FieldSource::Column(Column { ty, .. }) => ty_is_decimal(ty),
92 FieldSource::Constant(GenericValue::Decimal(_)) => true,
93 FieldSource::Relation(Relation::Aggregate(aggregate)) => match &aggregate.op {
94 AggregateOp::Sum(_) | AggregateOp::Min(_) | AggregateOp::Max(_) => {
95 aggregate.value_type.as_ref().is_some_and(ty_is_decimal)
96 }
97 AggregateOp::Ids { element_type } => ty_is_decimal(element_type),
98 AggregateOp::Count | AggregateOp::Avg(_) => false,
99 },
100 _ => false,
101 }
102}
103
104fn type_and_nullability(
107 field: &Field,
108 primary_key: Option<&ColumnName>,
109) -> (MappingType, bool, bool) {
110 match &field.source {
111 FieldSource::Column(Column {
112 column,
113 ty,
114 nullable,
115 default,
116 ..
117 }) => {
118 let forced_non_null = primary_key == Some(column) || default.is_some();
119 (ty.opensearch(), *nullable && !forced_non_null, false)
120 }
121 FieldSource::Group(_) => (MappingType::Object, false, false),
122 FieldSource::Geo(geo) => (
123 MappingType::Other("geo_point".to_owned()),
124 geo.nullable,
125 false,
126 ),
127 FieldSource::Constant(value) => (
128 constant_mapping_type(value),
129 matches!(value, GenericValue::Null),
130 false,
131 ),
132 FieldSource::Relation(Relation::Join(join)) => {
133 let mapping_type = if join.kind.is_to_many() {
134 MappingType::Nested
135 } else {
136 MappingType::Object
137 };
138 (mapping_type, join.nullable, false)
139 }
140 FieldSource::Relation(Relation::Aggregate(aggregate)) => aggregate_type(aggregate),
141 }
142}
143
144fn aggregate_type(aggregate: &Aggregate) -> (MappingType, bool, bool) {
145 match &aggregate.op {
146 AggregateOp::Count => (MappingType::Long, false, false),
147 AggregateOp::Avg(_) => (MappingType::Double, true, false),
148 AggregateOp::Sum(_) | AggregateOp::Min(_) | AggregateOp::Max(_) => {
149 let mapping_type = aggregate
150 .value_type
151 .as_ref()
152 .map(|ty| ty.opensearch())
153 .unwrap_or(MappingType::Double);
156 (mapping_type, true, false)
157 }
158 AggregateOp::Ids { element_type } => (element_type.opensearch(), false, true),
159 }
160}
161
162#[cfg(test)]
163mod tests;
164
165fn constant_mapping_type(value: &GenericValue) -> MappingType {
167 match value {
168 GenericValue::Bool(_) => MappingType::Boolean,
169 GenericValue::SmallInt(_) => MappingType::Short,
170 GenericValue::Int(_) => MappingType::Integer,
171 GenericValue::BigInt(_) => MappingType::Long,
172 GenericValue::Float(_) => MappingType::Float,
173 GenericValue::Double(_) | GenericValue::Decimal(_) => MappingType::Double,
174 GenericValue::Date(_)
175 | GenericValue::Time(_)
176 | GenericValue::Timestamp(_)
177 | GenericValue::TimestampTz(_) => MappingType::Date,
178 GenericValue::Bytes(_) => MappingType::Other("binary".to_owned()),
179 GenericValue::Array(items) => items
180 .first()
181 .map(constant_mapping_type)
182 .unwrap_or(MappingType::Keyword),
183 GenericValue::Map(_) => MappingType::Object,
184 GenericValue::String(_) | GenericValue::Uuid(_) | GenericValue::Null => {
185 MappingType::Keyword
186 }
187 }
188}