narrowdb 0.1.1

A lightweight columnar database engine for log and time-series data
Documentation
use anyhow::{Context, Result};
use ordered_float::OrderedFloat;

use crate::sql::{AggregateKind, CompareOp, Filter, Projection, ProjectionExpr};
use crate::storage::ColumnData;
use crate::types::{Schema, Value};

#[derive(Debug)]
pub(super) struct CompiledFilter {
    pub(super) column_index: usize,
    pub(super) op: CompareOp,
    pub(super) value: Value,
}

#[derive(Debug)]
pub(super) enum RowSelection {
    All(usize),
    Indexes(Vec<u32>),
}

impl RowSelection {
    pub(super) fn is_empty(&self) -> bool {
        match self {
            Self::All(rows) => *rows == 0,
            Self::Indexes(rows) => rows.is_empty(),
        }
    }

    pub(super) fn try_for_each<F>(&self, mut visitor: F) -> Result<()>
    where
        F: FnMut(usize) -> Result<()>,
    {
        match self {
            Self::All(rows) => {
                for row in 0..*rows {
                    visitor(row)?;
                }
            }
            Self::Indexes(rows) => {
                for row in rows {
                    visitor(*row as usize)?;
                }
            }
        }
        Ok(())
    }
}

#[derive(Debug, Clone)]
pub(super) struct CompiledProjection {
    pub(super) alias: String,
    pub(super) expr: CompiledProjectionExpr,
}

#[derive(Debug, Clone)]
pub(super) enum CompiledProjectionExpr {
    Column {
        column_index: usize,
        group_key_index: Option<usize>,
    },
    Aggregate {
        kind: AggregateKind,
        column_index: Option<usize>,
    },
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(super) enum LocalKeyPart {
    Int64(i64),
    Float64(OrderedFloat<f64>),
    Bool(bool),
    String(String),
    StringCode(u32),
}

impl LocalKeyPart {
    pub(super) fn from_column(column: &ColumnData, row: usize) -> Result<Self> {
        match column {
            ColumnData::Int64(values) => Ok(Self::Int64(values[row])),
            ColumnData::Float64(values) => Ok(Self::Float64(values[row])),
            ColumnData::Bool(values) => Ok(Self::Bool(values[row])),
            ColumnData::StringPlain(values) => Ok(Self::String(values[row].clone())),
            ColumnData::StringDict { codes, .. } => Ok(Self::StringCode(codes[row])),
        }
    }
}

pub(super) fn compile_filters(schema: &Schema, filters: &[Filter]) -> Result<Vec<CompiledFilter>> {
    let mut compiled = filters
        .iter()
        .map(|filter| {
            let column_index = column_index(schema, &filter.column)?;
            let data_type = schema.columns[column_index].data_type;
            let value = match filter.value.clone() {
                Some(v) => Value::cast_for(data_type, v)?,
                None => Value::Null,
            };
            Ok(CompiledFilter {
                column_index,
                op: filter.op,
                value,
            })
        })
        .collect::<Result<Vec<_>>>()?;
    compiled.sort_by_key(filter_priority);
    Ok(compiled)
}

fn filter_priority(filter: &CompiledFilter) -> u8 {
    match filter.op {
        CompareOp::Eq => 0,
        CompareOp::Gt | CompareOp::Gte | CompareOp::Lt | CompareOp::Lte => 1,
        CompareOp::NotEq => 2,
        CompareOp::IsNull | CompareOp::IsNotNull => 0,
    }
}

pub(super) fn compile_projections(
    schema: &Schema,
    group_by: &[String],
    projections: &[Projection],
) -> Result<Vec<CompiledProjection>> {
    projections
        .iter()
        .map(|projection| {
            let expr = match &projection.expr {
                ProjectionExpr::Column(name) => CompiledProjectionExpr::Column {
                    column_index: column_index(schema, name)?,
                    group_key_index: group_by
                        .iter()
                        .position(|group_column| group_column == name),
                },
                ProjectionExpr::Aggregate { kind, column } => CompiledProjectionExpr::Aggregate {
                    kind: *kind,
                    column_index: column
                        .as_ref()
                        .map(|name| column_index(schema, name))
                        .transpose()?,
                },
            };
            Ok(CompiledProjection {
                alias: projection.alias.clone(),
                expr,
            })
        })
        .collect()
}

pub(super) fn column_index(schema: &Schema, column_name: &str) -> Result<usize> {
    schema
        .columns
        .iter()
        .position(|column| column.name == column_name)
        .with_context(|| format!("unknown column {column_name}"))
}