aedb 0.1.10

Embedded Rust storage engine with transactional commits, WAL durability, and snapshot-consistent reads
Documentation
use crate::catalog::types::{Row, Value};
use crate::query::error::QueryError;
use serde::{Deserialize, Serialize};

pub(super) fn extract_sort_key(
    row: &Row,
    sort_indices: &[(usize, crate::query::plan::Order)],
) -> Vec<Value> {
    if sort_indices.is_empty() {
        return row.values.clone();
    }
    sort_indices
        .iter()
        .map(|(idx, _)| row.values[*idx].clone())
        .collect()
}

pub(super) fn extract_pk_key(row: &Row, pk_indices: &[usize]) -> Vec<Value> {
    if pk_indices.is_empty() {
        return row.values.clone();
    }
    pk_indices
        .iter()
        .map(|idx| row.values[*idx].clone())
        .collect()
}

pub(super) fn row_after_cursor(
    row: &Row,
    cursor: &CursorToken,
    sort_indices: &[(usize, crate::query::plan::Order)],
    pk_indices: &[usize],
) -> bool {
    let row_sort = extract_sort_key(row, sort_indices);
    let row_pk = extract_pk_key(row, pk_indices);
    if sort_indices.is_empty() {
        return row_pk > cursor.last_pk;
    }
    for ((_, order), (lhs, rhs)) in sort_indices
        .iter()
        .zip(row_sort.iter().zip(cursor.last_sort_key.iter()))
    {
        let cmp = lhs.cmp(rhs);
        if cmp.is_eq() {
            continue;
        }
        return match order {
            crate::query::plan::Order::Asc => cmp.is_gt(),
            crate::query::plan::Order::Desc => cmp.is_lt(),
        };
    }
    row_pk > cursor.last_pk
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct CursorToken {
    pub(super) snapshot_seq: u64,
    pub(super) last_sort_key: Vec<Value>,
    pub(super) last_pk: Vec<Value>,
    pub(super) page_size: usize,
    pub(super) remaining_limit: Option<usize>,
}

pub(super) fn encode_cursor(cursor: &CursorToken) -> Result<String, QueryError> {
    let bytes = rmp_serde::to_vec(cursor).map_err(|e| QueryError::InternalError(e.to_string()))?;
    Ok(bytes.iter().map(|b| format!("{b:02x}")).collect())
}

pub(super) fn decode_cursor(encoded: &str) -> Result<CursorToken, QueryError> {
    let encoded_size_bytes = encoded.len();
    if !encoded_size_bytes.is_multiple_of(2) {
        return Err(QueryError::InvalidQuery {
            reason: "invalid cursor".into(),
        });
    }
    let mut decoded_bytes = Vec::with_capacity(encoded_size_bytes / 2);
    let encoded_bytes = encoded.as_bytes();
    for byte_offset in (0..encoded_bytes.len()).step_by(2) {
        let hi = decode_hex_nibble(encoded_bytes[byte_offset]).ok_or_else(|| {
            QueryError::InvalidQuery {
                reason: "invalid cursor".into(),
            }
        })?;
        let lo = decode_hex_nibble(encoded_bytes[byte_offset + 1]).ok_or_else(|| {
            QueryError::InvalidQuery {
                reason: "invalid cursor".into(),
            }
        })?;
        decoded_bytes.push((hi << 4) | lo);
    }
    rmp_serde::from_slice(&decoded_bytes).map_err(|e| QueryError::InvalidQuery {
        reason: e.to_string(),
    })
}

fn decode_hex_nibble(byte: u8) -> Option<u8> {
    match byte {
        b'0'..=b'9' => Some(byte - b'0'),
        b'a'..=b'f' => Some(byte - b'a' + 10),
        b'A'..=b'F' => Some(byte - b'A' + 10),
        _ => None,
    }
}