use crate::common::{ColumnName, GenericValue, IndexName};
use super::{
Aggregate, AggregateOp, Column, ContentHash, Field, FieldSource, FlussoType, IndexMapping,
IndexSchema, Mapping, MappingType, Relation, ResolvedField,
};
impl IndexSchema {
pub fn resolve(&self, index: IndexName) -> IndexMapping {
resolve_index(index, self)
}
}
fn resolve_index(index: IndexName, schema: &IndexSchema) -> IndexMapping {
IndexMapping {
index,
hash: ContentHash::of(schema),
fields: resolve_fields(&schema.fields, schema.primary_key.as_ref()),
}
}
fn resolve_fields(fields: &[Field], primary_key: Option<&ColumnName>) -> Vec<ResolvedField> {
fields
.iter()
.map(|field| resolve_field(field, primary_key))
.collect()
}
fn resolve_field(field: &Field, primary_key: Option<&ColumnName>) -> ResolvedField {
let (child_fields, child_pk): (&[Field], Option<&ColumnName>) = match &field.source {
FieldSource::Relation(Relation::Join(join)) => (&join.fields, Some(&join.primary_key)),
FieldSource::Group(fields) => (fields, primary_key),
_ => (&[], primary_key),
};
let children = resolve_fields(child_fields, child_pk);
let (mapping_type, nullable, array) = type_and_nullability(field, primary_key);
let mapping = Mapping {
mapping_type,
extra: field.options.clone(),
map_values: map_value_type(field),
decimal: is_decimal(field),
};
ResolvedField {
name: field.field.clone(),
mapping,
nullable,
array,
children,
}
}
fn map_value_type(field: &Field) -> Option<MappingType> {
match &field.source {
FieldSource::Column(Column {
ty: FlussoType::Map { values },
..
}) => Some(values.opensearch()),
_ => None,
}
}
fn is_decimal(field: &Field) -> bool {
let ty_is_decimal = |ty: &FlussoType| matches!(ty, FlussoType::Decimal);
match &field.source {
FieldSource::Column(Column { ty, .. }) => ty_is_decimal(ty),
FieldSource::Constant(GenericValue::Decimal(_)) => true,
FieldSource::Relation(Relation::Aggregate(aggregate)) => match &aggregate.op {
AggregateOp::Sum(_) | AggregateOp::Min(_) | AggregateOp::Max(_) => {
aggregate.value_type.as_ref().is_some_and(ty_is_decimal)
}
AggregateOp::Ids { element_type } => ty_is_decimal(element_type),
AggregateOp::Count | AggregateOp::Avg(_) => false,
},
_ => false,
}
}
fn type_and_nullability(
field: &Field,
primary_key: Option<&ColumnName>,
) -> (MappingType, bool, bool) {
match &field.source {
FieldSource::Column(Column {
column,
ty,
nullable,
default,
..
}) => {
let forced_non_null = primary_key == Some(column) || default.is_some();
(ty.opensearch(), *nullable && !forced_non_null, false)
}
FieldSource::Group(_) => (MappingType::Object, false, false),
FieldSource::Geo(geo) => (
MappingType::Other("geo_point".to_owned()),
geo.nullable,
false,
),
FieldSource::Constant(value) => (
constant_mapping_type(value),
matches!(value, GenericValue::Null),
false,
),
FieldSource::Relation(Relation::Join(join)) => {
let mapping_type = if join.kind.is_to_many() {
MappingType::Nested
} else {
MappingType::Object
};
(mapping_type, join.nullable, false)
}
FieldSource::Relation(Relation::Aggregate(aggregate)) => aggregate_type(aggregate),
}
}
fn aggregate_type(aggregate: &Aggregate) -> (MappingType, bool, bool) {
match &aggregate.op {
AggregateOp::Count => (MappingType::Long, false, false),
AggregateOp::Avg(_) => (MappingType::Double, true, false),
AggregateOp::Sum(_) | AggregateOp::Min(_) | AggregateOp::Max(_) => {
let mapping_type = aggregate
.value_type
.as_ref()
.map(|ty| ty.opensearch())
.unwrap_or(MappingType::Double);
(mapping_type, true, false)
}
AggregateOp::Ids { element_type } => (element_type.opensearch(), false, true),
}
}
#[cfg(test)]
mod tests;
fn constant_mapping_type(value: &GenericValue) -> MappingType {
match value {
GenericValue::Bool(_) => MappingType::Boolean,
GenericValue::SmallInt(_) => MappingType::Short,
GenericValue::Int(_) => MappingType::Integer,
GenericValue::BigInt(_) => MappingType::Long,
GenericValue::Float(_) => MappingType::Float,
GenericValue::Double(_) | GenericValue::Decimal(_) => MappingType::Double,
GenericValue::Date(_)
| GenericValue::Time(_)
| GenericValue::Timestamp(_)
| GenericValue::TimestampTz(_) => MappingType::Date,
GenericValue::Bytes(_) => MappingType::Other("binary".to_owned()),
GenericValue::Array(items) => items
.first()
.map(constant_mapping_type)
.unwrap_or(MappingType::Keyword),
GenericValue::Map(_) => MappingType::Object,
GenericValue::String(_) | GenericValue::Uuid(_) | GenericValue::Null => {
MappingType::Keyword
}
}
}