icydb-core 0.180.4

IcyDB — A schema-first typed query engine and persistence runtime for Internet Computer canisters
Documentation
//! Module: predicate::encoding
//! Responsibility: shared predicate/value/coercion encoding primitives.
//! Does not own: predicate normalization policy or hash consumer boundaries.
//! Boundary: consumed by normalization sort-key generation and fingerprint hashing.

use crate::{
    db::{
        numeric::coerce_numeric_decimal,
        predicate::{CoercionId, CoercionSpec, CompareOp, Predicate, casefold_text},
    },
    value::{Value, ValueEnum, canonicalize_value_set},
};

const SORT_PRED_TRUE: u8 = 0x00;
const SORT_PRED_FALSE: u8 = 0x01;
const SORT_PRED_AND: u8 = 0x02;
const SORT_PRED_OR: u8 = 0x03;
const SORT_PRED_NOT: u8 = 0x04;
const SORT_PRED_COMPARE: u8 = 0x05;
const SORT_PRED_COMPARE_FIELDS: u8 = 0x0B;
const SORT_PRED_IS_NULL: u8 = 0x06;
const SORT_PRED_IS_MISSING: u8 = 0x07;
const SORT_PRED_IS_EMPTY: u8 = 0x08;
const SORT_PRED_IS_NOT_EMPTY: u8 = 0x09;
const SORT_PRED_IS_NOT_NULL: u8 = 0x0A;
const SORT_PRED_TEXT_CONTAINS: u8 = 0x0D;
const SORT_PRED_TEXT_CONTAINS_CI: u8 = 0x0E;

///
/// Encode a predicate into deterministic sort-key bytes.
///
#[must_use]
pub(in crate::db::predicate) fn encode_predicate_sort_key(predicate: &Predicate) -> Vec<u8> {
    let mut out = Vec::new();
    encode_predicate_sort_key_into(&mut out, predicate, false);
    out
}

///
/// Encode an already-normalized predicate into deterministic sort-key bytes.
///
/// This boundary is only for planner-owned normalized predicates. It reuses the
/// same canonical framing as `encode_predicate_sort_key(...)` while skipping
/// repeated list sort/dedup work for `IN` / `NOT IN` literals that were already
/// canonicalized during schema-aware normalization.
///
#[must_use]
pub(in crate::db::predicate) fn encode_normalized_predicate_sort_key(
    predicate: &Predicate,
) -> Vec<u8> {
    let mut out = Vec::new();
    encode_predicate_sort_key_into(&mut out, predicate, true);
    out
}

// Encode predicate keys with length-prefixed segments to avoid collisions.
fn encode_predicate_sort_key_into(
    out: &mut Vec<u8>,
    predicate: &Predicate,
    compare_lists_already_canonical: bool,
) {
    match predicate {
        Predicate::True => out.push(SORT_PRED_TRUE),
        Predicate::False => out.push(SORT_PRED_FALSE),
        Predicate::And(children) => {
            out.push(SORT_PRED_AND);
            push_len_u64(out, children.len());
            for child in children {
                push_predicate_sort_key_framed(out, child, compare_lists_already_canonical);
            }
        }
        Predicate::Or(children) => {
            out.push(SORT_PRED_OR);
            push_len_u64(out, children.len());
            for child in children {
                push_predicate_sort_key_framed(out, child, compare_lists_already_canonical);
            }
        }
        Predicate::Not(inner) => {
            out.push(SORT_PRED_NOT);
            push_predicate_sort_key_framed(out, inner, compare_lists_already_canonical);
        }
        Predicate::Compare(cmp) => {
            out.push(SORT_PRED_COMPARE);
            push_str_u64(out, &cmp.field);
            out.push(cmp.op.tag());
            push_compare_value_sort_key_framed(
                out,
                cmp.op,
                cmp.coercion.id,
                &cmp.value,
                compare_lists_already_canonical,
            );
            push_coercion_sort_key_framed(out, &cmp.coercion);
        }
        Predicate::CompareFields(cmp) => {
            out.push(SORT_PRED_COMPARE_FIELDS);
            push_str_u64(out, &cmp.left_field);
            out.push(cmp.op.tag());
            push_str_u64(out, &cmp.right_field);
            push_coercion_sort_key_framed(out, &cmp.coercion);
        }
        Predicate::IsNull { field } => {
            out.push(SORT_PRED_IS_NULL);
            push_str_u64(out, field);
        }
        Predicate::IsNotNull { field } => {
            out.push(SORT_PRED_IS_NOT_NULL);
            push_str_u64(out, field);
        }
        Predicate::IsMissing { field } => {
            out.push(SORT_PRED_IS_MISSING);
            push_str_u64(out, field);
        }
        Predicate::IsEmpty { field } => {
            out.push(SORT_PRED_IS_EMPTY);
            push_str_u64(out, field);
        }
        Predicate::IsNotEmpty { field } => {
            out.push(SORT_PRED_IS_NOT_EMPTY);
            push_str_u64(out, field);
        }
        Predicate::TextContains { field, value } => {
            out.push(SORT_PRED_TEXT_CONTAINS);
            push_str_u64(out, field);
            push_value_sort_key_framed(out, value);
        }
        Predicate::TextContainsCi { field, value } => {
            out.push(SORT_PRED_TEXT_CONTAINS_CI);
            push_str_u64(out, field);
            push_value_sort_key_framed(out, value);
        }
    }
}

fn encode_value_sort_key_into(out: &mut Vec<u8>, value: &Value) {
    out.push(value.canonical_tag().to_u8());

    match value {
        Value::Account(v) => {
            push_bytes_u64(out, v.owner().as_slice());
            match v.subaccount() {
                Some(sub) => {
                    out.push(1);
                    push_bytes_u64(out, &sub.to_bytes());
                }
                None => out.push(0),
            }
        }
        Value::Blob(v) => push_bytes_u64(out, v),
        Value::Bool(v) => out.push(u8::from(*v)),
        Value::Date(v) => out.extend_from_slice(&v.as_days_since_epoch().to_be_bytes()),
        Value::Decimal(v) => {
            let normalized = v.normalize();
            out.push(u8::from(normalized.is_sign_negative()));
            out.extend_from_slice(&normalized.scale().to_be_bytes());
            out.extend_from_slice(&normalized.mantissa().to_be_bytes());
        }
        Value::Duration(v) => out.extend_from_slice(&v.as_millis().to_be_bytes()),
        Value::Enum(v) => encode_enum_sort_key_into(out, v),
        Value::Float32(v) => out.extend_from_slice(&v.to_be_bytes()),
        Value::Float64(v) => out.extend_from_slice(&v.to_be_bytes()),
        Value::Int64(v) => out.extend_from_slice(&v.to_be_bytes()),
        Value::Int128(v) => out.extend_from_slice(&v.to_be_bytes()),
        Value::IntBig(v) => push_bytes_u64(out, &v.to_leb128()),
        Value::List(items) => {
            push_len_u64(out, items.len());
            for item in items {
                push_value_sort_key_framed(out, item);
            }
        }
        Value::Map(entries) => {
            // Normalize map entry order at encode time so callers that build
            // `Value::Map` directly in non-canonical order cannot perturb
            // predicate sort-key determinism.
            let ordered = Value::ordered_map_entries(entries);

            push_len_u64(out, ordered.len());
            for (key, value) in ordered {
                push_value_sort_key_framed(out, key);
                push_value_sort_key_framed(out, value);
            }
        }
        Value::Null | Value::Unit => {}
        Value::Principal(v) => push_bytes_u64(out, v.as_slice()),
        Value::Subaccount(v) => push_bytes_u64(out, &v.to_bytes()),
        Value::Text(v) => push_str_u64(out, v),
        Value::Timestamp(v) => out.extend_from_slice(&v.as_millis().to_be_bytes()),
        Value::Nat64(v) => out.extend_from_slice(&v.to_be_bytes()),
        Value::Nat128(v) => out.extend_from_slice(&v.to_be_bytes()),
        Value::NatBig(v) => push_bytes_u64(out, &v.to_leb128()),
        Value::Ulid(v) => out.extend_from_slice(&v.to_bytes()),
    }
}

fn encode_compare_value_sort_key_into(
    out: &mut Vec<u8>,
    op: CompareOp,
    coercion: CoercionId,
    value: &Value,
    compare_lists_already_canonical: bool,
) {
    if matches!(op, CompareOp::In | CompareOp::NotIn)
        && let Value::List(items) = value
    {
        out.push(value.canonical_tag().to_u8());
        if compare_lists_already_canonical {
            push_len_u64(out, items.len());
            for item in items {
                match coercion {
                    CoercionId::Strict | CoercionId::CollectionElement => {
                        push_value_sort_key_framed(out, item);
                    }
                    CoercionId::NumericWiden | CoercionId::TextCasefold => {
                        let canonical = canonicalize_compare_literal_for_coercion(coercion, item);
                        push_value_sort_key_framed(out, &canonical);
                    }
                }
            }
        } else {
            let ordered = canonicalize_compare_literal_list_for_coercion(coercion, items);

            push_len_u64(out, ordered.len());
            for item in &ordered {
                push_value_sort_key_framed(out, item);
            }
        }
        return;
    }

    let canonical = canonicalize_compare_literal_for_coercion(coercion, value);
    encode_value_sort_key_into(out, &canonical);
}

fn canonicalize_compare_literal_for_coercion(coercion: CoercionId, value: &Value) -> Value {
    match coercion {
        CoercionId::Strict | CoercionId::CollectionElement => value.clone(),
        CoercionId::NumericWiden => {
            if let Some(decimal) = coerce_numeric_decimal(value) {
                return Value::Decimal(decimal);
            }

            value.clone()
        }
        CoercionId::TextCasefold => {
            if let Value::Text(text) = value {
                return Value::Text(casefold_text(text));
            }

            value.clone()
        }
    }
}

fn encode_enum_sort_key_into(out: &mut Vec<u8>, value: &ValueEnum) {
    match value.path() {
        Some(path) => {
            out.push(1);
            push_str_u64(out, path);
        }
        None => out.push(0),
    }
    push_str_u64(out, value.variant());
    match value.payload() {
        Some(payload) => {
            out.push(1);
            push_value_sort_key_framed(out, payload);
        }
        None => out.push(0),
    }
}

fn encode_coercion_sort_key_into(out: &mut Vec<u8>, spec: &CoercionSpec) {
    out.push(match spec.id {
        CoercionId::Strict => 0,
        CoercionId::NumericWiden => 1,
        CoercionId::TextCasefold => 3,
        CoercionId::CollectionElement => 4,
    });
    push_len_u64(out, spec.params.len());
    for (key, value) in &spec.params {
        push_str_u64(out, key);
        push_str_u64(out, value);
    }
}

// Canonicalize compare-list literals once so sort-key and fingerprint encoding
// share the same deterministic IN/NOT IN normalization boundary.
fn canonicalize_compare_literal_list_for_coercion(
    coercion: CoercionId,
    items: &[Value],
) -> Vec<Value> {
    let mut ordered = items
        .iter()
        .map(|item| canonicalize_compare_literal_for_coercion(coercion, item))
        .collect::<Vec<_>>();
    canonicalize_value_set(&mut ordered);

    ordered
}

fn push_len_u64(out: &mut Vec<u8>, len: usize) {
    // Sort keys are diagnostics-only; overflow saturates for determinism.
    let len = u64::try_from(len).unwrap_or(u64::MAX);
    out.extend_from_slice(&len.to_be_bytes());
}

// Reserve one `[len:u64be]` header and return the payload start offset.
fn begin_framed(out: &mut Vec<u8>) -> (usize, usize) {
    let len_pos = out.len();
    out.extend_from_slice(&0u64.to_be_bytes());
    let payload_start = out.len();

    (len_pos, payload_start)
}

// Finalize one `[len:u64be][payload]` frame after the payload bytes have been written.
fn finish_framed(out: &mut [u8], len_pos: usize, payload_start: usize) {
    let payload_len = out.len().saturating_sub(payload_start);
    let payload_len = u64::try_from(payload_len).unwrap_or(u64::MAX);
    out[len_pos..len_pos + std::mem::size_of::<u64>()].copy_from_slice(&payload_len.to_be_bytes());
}

fn push_predicate_sort_key_framed(
    out: &mut Vec<u8>,
    predicate: &Predicate,
    compare_lists_already_canonical: bool,
) {
    let (len_pos, payload_start) = begin_framed(out);
    encode_predicate_sort_key_into(out, predicate, compare_lists_already_canonical);
    finish_framed(out, len_pos, payload_start);
}

fn push_value_sort_key_framed(out: &mut Vec<u8>, value: &Value) {
    let (len_pos, payload_start) = begin_framed(out);
    encode_value_sort_key_into(out, value);
    finish_framed(out, len_pos, payload_start);
}

fn push_compare_value_sort_key_framed(
    out: &mut Vec<u8>,
    op: CompareOp,
    coercion: CoercionId,
    value: &Value,
    compare_lists_already_canonical: bool,
) {
    let (len_pos, payload_start) = begin_framed(out);
    encode_compare_value_sort_key_into(out, op, coercion, value, compare_lists_already_canonical);
    finish_framed(out, len_pos, payload_start);
}

fn push_coercion_sort_key_framed(out: &mut Vec<u8>, spec: &CoercionSpec) {
    let (len_pos, payload_start) = begin_framed(out);
    encode_coercion_sort_key_into(out, spec);
    finish_framed(out, len_pos, payload_start);
}

fn push_bytes_u64(out: &mut Vec<u8>, bytes: &[u8]) {
    push_len_u64(out, bytes.len());
    out.extend_from_slice(bytes);
}

fn push_str_u64(out: &mut Vec<u8>, s: &str) {
    push_bytes_u64(out, s.as_bytes());
}

///
/// TESTS
///

#[cfg(test)]
mod tests;