exoware-sql 2026.4.1

SQL engine backed by the Exoware API.
Documentation
use datafusion::common::Result as DataFusionResult;

use crate::aggregate::{AggregateAccessPath, AggregatePushdownSpec};
use crate::codec::*;
use crate::filter::*;
use crate::predicate::*;
use crate::types::*;

#[derive(Debug, Clone)]
pub(crate) struct AccessPathDiagnostics {
    pub(crate) mode: String,
    pub(crate) predicate: String,
    pub(crate) exact: bool,
    pub(crate) row_recheck: bool,
    pub(crate) full_scan_like: bool,
    pub(crate) range_count: usize,
    pub(crate) constrained_prefix_len: Option<usize>,
}

#[derive(Debug, Clone)]
pub(crate) struct AggregatePushdownDiagnostics {
    pub(crate) grouped: bool,
    pub(crate) seed_job: Option<AccessPathDiagnostics>,
    pub(crate) aggregate_jobs: Vec<AccessPathDiagnostics>,
}

pub(crate) type ChosenAggregateAccessPath =
    (Vec<KeyRange>, AggregateAccessPath, Option<usize>, bool);

#[derive(Debug)]
pub(crate) struct KvAggregateTable {
    pub(crate) spec: AggregatePushdownSpec,
}

pub(crate) enum QueryStatsExplainSurface {
    StreamedRangeTrailer,
    RangeReduceHeader,
}

pub(crate) fn format_query_stats_explain(surface: QueryStatsExplainSurface) -> &'static str {
    match surface {
        QueryStatsExplainSurface::StreamedRangeTrailer => {
            "streamed_range(detail.read_stats: read_bytes=key+value bytes for rows read; ref RocksDB engine)"
        }
        QueryStatsExplainSurface::RangeReduceHeader => {
            "range_reduce(detail.read_stats: read_bytes=key+value bytes for rows read; ref RocksDB engine)"
        }
    }
}

pub(crate) fn format_access_path_diagnostics(diag: &AccessPathDiagnostics) -> String {
    let constrained_prefix = diag
        .constrained_prefix_len
        .map(|len| format!(", constrained_prefix={len}"))
        .unwrap_or_default();
    format!(
        "mode={}, predicate={}, exact={}, row_recheck={}, ranges={}, full_scan_like={}{}",
        diag.mode,
        diag.predicate,
        diag.exact,
        diag.row_recheck,
        diag.range_count,
        diag.full_scan_like,
        constrained_prefix
    )
}

pub(crate) fn build_scan_access_path_diagnostics(
    model: &TableModel,
    index_specs: &[ResolvedIndexSpec],
    predicate: &QueryPredicate,
    projection: &Option<Vec<usize>>,
) -> DataFusionResult<AccessPathDiagnostics> {
    if predicate.contradiction {
        return Ok(AccessPathDiagnostics {
            mode: "empty".to_string(),
            predicate: "FALSE".to_string(),
            exact: true,
            row_recheck: false,
            full_scan_like: false,
            range_count: 0,
            constrained_prefix_len: None,
        });
    }

    let access_plan = ScanAccessPlan::new(model, projection, predicate);
    if let Some(index_plan) = predicate.choose_index_plan(model, index_specs)? {
        let spec = &index_specs[index_plan.spec_idx];
        let exact = access_plan.predicate_fully_enforced_by_index_key(model, spec);
        return Ok(AccessPathDiagnostics {
            mode: format!(
                "secondary_index({}, {})",
                spec.name,
                match spec.layout {
                    IndexLayout::Lexicographic => "lexicographic",
                    IndexLayout::ZOrder => "z_order",
                }
            ),
            predicate: predicate.describe(model),
            exact,
            row_recheck: !exact,
            full_scan_like: false,
            range_count: index_plan.ranges.len(),
            constrained_prefix_len: Some(index_plan.constrained_prefix_len),
        });
    }

    let ranges = predicate.primary_key_ranges(model)?;
    let exact = access_plan.predicate_fully_enforced_by_primary_key(model);
    Ok(AccessPathDiagnostics {
        mode: "primary_key".to_string(),
        predicate: predicate.describe(model),
        exact,
        row_recheck: !exact,
        full_scan_like: is_primary_key_full_scan_like(model, &ranges),
        range_count: ranges.len(),
        constrained_prefix_len: None,
    })
}

pub(crate) fn build_aggregate_access_path_diagnostics(
    model: &TableModel,
    index_specs: &[ResolvedIndexSpec],
    predicate: &QueryPredicate,
    access_path: &AggregateAccessPath,
    ranges: &[KeyRange],
    constrained_prefix_len: Option<usize>,
    exact: bool,
) -> AccessPathDiagnostics {
    AccessPathDiagnostics {
        mode: match access_path {
            AggregateAccessPath::PrimaryKey => "primary_key".to_string(),
            AggregateAccessPath::SecondaryIndex { spec_idx } => {
                let spec = &index_specs[*spec_idx];
                format!(
                    "secondary_index({}, {})",
                    spec.name,
                    match spec.layout {
                        IndexLayout::Lexicographic => "lexicographic",
                        IndexLayout::ZOrder => "z_order",
                    }
                )
            }
        },
        predicate: predicate.describe(model),
        exact,
        row_recheck: !exact,
        full_scan_like: matches!(access_path, AggregateAccessPath::PrimaryKey)
            && is_primary_key_full_scan_like(model, ranges),
        range_count: ranges.len(),
        constrained_prefix_len,
    }
}

pub(crate) fn is_primary_key_full_scan_like(model: &TableModel, ranges: &[KeyRange]) -> bool {
    ranges.len() == 1
        && ranges[0].start == primary_key_prefix_range(model.table_prefix).start
        && ranges[0].end == primary_key_prefix_range(model.table_prefix).end
}