flusso-schema-core 0.2.0

Shared domain vocabulary for flusso: values, validated newtypes, and index mappings.
Documentation
//! Projecting a self-describing schema into a fully-typed mapping — without a
//! database.
//!
//! Every gap a thin config once left to the source is now stated in the schema:
//! a column field carries its [`FlussoType`](super::FlussoType) and nullability,
//! an aggregate its result type. So the mapping follows from the schema alone.
//! The structural rules are unchanged from when the source derived them — a
//! group is an `object`, a to-many join is a `nested` array, a `count` is a
//! non-null `long`, a primary key is never null — they just no longer need a
//! round-trip to ask.

use crate::common::{ColumnName, GenericValue, IndexName};

use super::{
    Aggregate, AggregateOp, Column, ContentHash, Field, FieldSource, IndexMapping, IndexSchema,
    Mapping, MappingType, Relation, ResolvedField,
};

impl IndexSchema {
    /// Project this schema into its fully-typed [`IndexMapping`].
    pub fn resolve(&self, index: IndexName) -> IndexMapping {
        resolve_index(index, self)
    }
}

fn resolve_index(index: IndexName, schema: &IndexSchema) -> IndexMapping {
    IndexMapping {
        index,
        // Hash the parsed schema, not the file: structural changes (including a
        // declared type) flip the hash; cosmetic file changes do not.
        hash: ContentHash::of(schema),
        fields: resolve_fields(&schema.fields, schema.primary_key.as_ref()),
    }
}

/// Resolve a list of fields. `primary_key` is the root table's key while we are
/// still on the root row (it passes through groups, which stay on the same row);
/// it is `None` once we cross into a related table via a join.
fn resolve_fields(fields: &[Field], primary_key: Option<&ColumnName>) -> Vec<ResolvedField> {
    fields
        .iter()
        .map(|field| resolve_field(field, primary_key))
        .collect()
}

fn resolve_field(field: &Field, primary_key: Option<&ColumnName>) -> ResolvedField {
    let (child_fields, child_pk): (&[Field], Option<&ColumnName>) = match &field.source {
        FieldSource::Relation(Relation::Join(join)) => (&join.fields, Some(&join.primary_key)),
        FieldSource::Group(fields) => (fields, primary_key),
        _ => (&[], primary_key),
    };
    let children = resolve_fields(child_fields, child_pk);

    let (mapping_type, nullable) = type_and_nullability(field, primary_key);
    let mapping = Mapping {
        mapping_type,
        extra: field.options.clone(),
    };

    ResolvedField {
        name: field.field.clone(),
        mapping,
        nullable,
        children,
    }
}

fn type_and_nullability(field: &Field, primary_key: Option<&ColumnName>) -> (MappingType, bool) {
    match &field.source {
        FieldSource::Column(Column {
            column,
            ty,
            nullable,
            default,
            ..
        }) => {
            let forced_non_null = primary_key == Some(column) || default.is_some();
            (ty.opensearch(), *nullable && !forced_non_null)
        }
        FieldSource::Group(_) => (MappingType::Object, false),
        FieldSource::Geo(geo) => (MappingType::Other("geo_point".to_owned()), geo.nullable),
        FieldSource::Constant(value) => (
            constant_mapping_type(value),
            matches!(value, GenericValue::Null),
        ),
        FieldSource::Relation(Relation::Join(join)) => {
            if join.kind.is_to_many() {
                (MappingType::Nested, false)
            } else {
                (MappingType::Object, true)
            }
        }
        FieldSource::Relation(Relation::Aggregate(aggregate)) => aggregate_type(aggregate),
    }
}

fn aggregate_type(aggregate: &Aggregate) -> (MappingType, bool) {
    match &aggregate.op {
        AggregateOp::Count => (MappingType::Long, false),
        AggregateOp::Avg(_) => (MappingType::Double, true),
        AggregateOp::Sum(_) | AggregateOp::Min(_) | AggregateOp::Max(_) => {
            let mapping_type = aggregate
                .value_type
                .as_ref()
                .map(|ty| ty.opensearch())
                // Conversion requires a `value_type` for these ops; `double` is
                // a defensive fallback that should never be reached.
                .unwrap_or(MappingType::Double);
            (mapping_type, true)
        }
    }
}

/// The mapping type a constant value's shape implies.
fn constant_mapping_type(value: &GenericValue) -> MappingType {
    match value {
        GenericValue::Bool(_) => MappingType::Boolean,
        GenericValue::Int(_) => MappingType::Long,
        GenericValue::Decimal(_) => MappingType::Double,
        GenericValue::Array(items) => items
            .first()
            .map(constant_mapping_type)
            .unwrap_or(MappingType::Keyword),
        GenericValue::Map(_) => MappingType::Object,
        GenericValue::String(_) | GenericValue::Null => MappingType::Keyword,
    }
}