aedb 0.1.11

Embedded Rust storage engine with transactional commits, WAL durability, and snapshot-consistent reads
Documentation
use crate::catalog::schema::TableSchema;
use crate::catalog::types::{Row, Value};
use crate::error::AedbError;
use crate::query::operators::{compile_expr, eval_compiled_expr_public};
use crate::storage::encoded_key::{EncodedKey, prefix_successor};
use crate::storage::keyspace::{SecondaryIndex, SecondaryIndexStore};
use std::ops::Bound;

impl SecondaryIndex {
    pub fn insert(&mut self, key: EncodedKey, pk: EncodedKey) {
        match &mut self.store {
            SecondaryIndexStore::BTree(entries) => {
                let mut pks = entries.get(&key).cloned().unwrap_or_default();
                pks.insert(pk);
                entries.insert(key, pks);
            }
            SecondaryIndexStore::Hash(entries) => {
                let mut pks = entries.get(&key).cloned().unwrap_or_default();
                pks.insert(pk);
                entries.insert(key, pks);
            }
            SecondaryIndexStore::UniqueHash(entries) => {
                entries.insert(key, pk);
            }
        }
    }

    pub fn remove(&mut self, key: &EncodedKey, pk: &EncodedKey) {
        match &mut self.store {
            SecondaryIndexStore::BTree(entries) => {
                let Some(mut pks) = entries.get(key).cloned() else {
                    return;
                };
                pks.remove(pk);
                if pks.is_empty() {
                    entries.remove(key);
                } else {
                    entries.insert(key.clone(), pks);
                }
            }
            SecondaryIndexStore::Hash(entries) => {
                let Some(mut pks) = entries.get(key).cloned() else {
                    return;
                };
                pks.remove(pk);
                if pks.is_empty() {
                    entries.remove(key);
                } else {
                    entries.insert(key.clone(), pks);
                }
            }
            SecondaryIndexStore::UniqueHash(entries) => {
                entries.remove(key);
            }
        }
    }

    pub fn scan_eq(&self, key: &EncodedKey) -> Vec<EncodedKey> {
        match &self.store {
            SecondaryIndexStore::BTree(entries) => entries
                .get(key)
                .map(|pks| pks.iter().cloned().collect())
                .unwrap_or_default(),
            SecondaryIndexStore::Hash(entries) => entries
                .get(key)
                .map(|pks| pks.iter().cloned().collect())
                .unwrap_or_default(),
            SecondaryIndexStore::UniqueHash(entries) => entries
                .get(key)
                .map(|pk| vec![pk.clone()])
                .unwrap_or_default(),
        }
    }

    pub fn scan_range(&self, start: Bound<EncodedKey>, end: Bound<EncodedKey>) -> Vec<EncodedKey> {
        match &self.store {
            SecondaryIndexStore::BTree(entries) => entries
                .range((start, end))
                .flat_map(|(_, pks)| pks.iter().cloned())
                .collect(),
            SecondaryIndexStore::Hash(_) | SecondaryIndexStore::UniqueHash(_) => Vec::new(),
        }
    }

    pub fn scan_prefix(&self, prefix: &EncodedKey) -> Vec<EncodedKey> {
        if !matches!(self.store, SecondaryIndexStore::BTree(_)) {
            return Vec::new();
        }
        let Some(end) = prefix_successor(prefix) else {
            return self.scan_range(Bound::Included(prefix.clone()), Bound::Unbounded);
        };
        self.scan_range(Bound::Included(prefix.clone()), Bound::Excluded(end))
    }

    pub fn scan_prefix_window(
        &self,
        prefix: Option<&EncodedKey>,
        offset: usize,
        limit: usize,
    ) -> Vec<EncodedKey> {
        if limit == 0 {
            return Vec::new();
        }
        let SecondaryIndexStore::BTree(entries) = &self.store else {
            return Vec::new();
        };

        let mut out = Vec::with_capacity(limit);
        let mut skipped = 0usize;

        match prefix {
            None => {
                for (_, pks) in entries {
                    for pk in pks {
                        if skipped < offset {
                            skipped += 1;
                            continue;
                        }
                        if out.len() < limit {
                            out.push(pk.clone());
                        }
                        if out.len() >= limit {
                            return out;
                        }
                    }
                }
            }
            Some(prefix_key) => {
                let range_end = prefix_successor(prefix_key);
                let iter = match range_end {
                    Some(end) => {
                        entries.range((Bound::Included(prefix_key.clone()), Bound::Excluded(end)))
                    }
                    None => entries.range((Bound::Included(prefix_key.clone()), Bound::Unbounded)),
                };
                for (_, pks) in iter {
                    for pk in pks {
                        if skipped < offset {
                            skipped += 1;
                            continue;
                        }
                        if out.len() < limit {
                            out.push(pk.clone());
                        }
                        if out.len() >= limit {
                            return out;
                        }
                    }
                }
            }
        }

        out
    }

    pub fn rank_of_pk(&self, target_pk: &EncodedKey) -> Option<usize> {
        let SecondaryIndexStore::BTree(entries) = &self.store else {
            return None;
        };
        let mut rank = 0usize;
        for (_, pks) in entries {
            for pk in pks {
                if pk == target_pk {
                    return Some(rank);
                }
                rank += 1;
            }
        }
        None
    }

    pub fn unique_existing(&self, key: &EncodedKey) -> Option<EncodedKey> {
        match &self.store {
            SecondaryIndexStore::UniqueHash(entries) => entries.get(key).cloned(),
            _ => None,
        }
    }

    pub fn should_include_row(
        &self,
        row: &Row,
        schema: &TableSchema,
        table_name: &str,
    ) -> Result<bool, AedbError> {
        let Some(expr) = &self.partial_filter else {
            return Ok(true);
        };
        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
        let compiled = compile_expr(expr, &columns, table_name)
            .map_err(|e| AedbError::Validation(format!("{e:?}")))?;
        Ok(eval_compiled_expr_public(&compiled, row))
    }
}

pub fn extract_index_key(
    row: &Row,
    schema: &TableSchema,
    indexed_columns: &[String],
) -> Result<Vec<Value>, AedbError> {
    let mut out = Vec::with_capacity(indexed_columns.len());
    for col in indexed_columns {
        let column_index = schema
            .columns
            .iter()
            .position(|c| c.name == *col)
            .ok_or_else(|| AedbError::Validation(format!("indexed column not found: {col}")))?;
        out.push(row.values[column_index].clone());
    }
    Ok(out)
}

pub fn extract_index_key_encoded(
    row: &Row,
    schema: &TableSchema,
    indexed_columns: &[String],
) -> Result<EncodedKey, AedbError> {
    let values = extract_index_key(row, schema, indexed_columns)?;
    Ok(EncodedKey::from_values(&values))
}

#[cfg(test)]
mod tests {
    use super::{extract_index_key, extract_index_key_encoded};
    use crate::catalog::schema::{ColumnDef, TableSchema};
    use crate::catalog::types::{ColumnType, Row, Value};
    use crate::storage::encoded_key::EncodedKey;
    use crate::storage::keyspace::SecondaryIndex;
    use std::ops::Bound;

    #[test]
    fn secondary_index_insert_remove_and_range() {
        let mut secondary_index = SecondaryIndex::default();
        secondary_index.insert(
            EncodedKey::from_values(&[Value::Integer(10)]),
            EncodedKey::from_values(&[Value::Integer(1)]),
        );
        secondary_index.insert(
            EncodedKey::from_values(&[Value::Integer(20)]),
            EncodedKey::from_values(&[Value::Integer(2)]),
        );
        secondary_index.insert(
            EncodedKey::from_values(&[Value::Integer(30)]),
            EncodedKey::from_values(&[Value::Integer(3)]),
        );

        let eq = secondary_index.scan_eq(&EncodedKey::from_values(&[Value::Integer(20)]));
        assert_eq!(eq, vec![EncodedKey::from_values(&[Value::Integer(2)])]);

        let range = secondary_index.scan_range(
            Bound::Included(EncodedKey::from_values(&[Value::Integer(15)])),
            Bound::Included(EncodedKey::from_values(&[Value::Integer(30)])),
        );
        assert_eq!(range.len(), 2);

        secondary_index.remove(
            &EncodedKey::from_values(&[Value::Integer(20)]),
            &EncodedKey::from_values(&[Value::Integer(2)]),
        );
        assert!(
            secondary_index
                .scan_eq(&EncodedKey::from_values(&[Value::Integer(20)]))
                .is_empty()
        );
    }

    #[test]
    fn extract_index_key_reads_schema_positions() {
        let schema = TableSchema {
            project_id: "p".into(),
            scope_id: "app".into(),
            table_name: "t".into(),
            owner_id: None,
            columns: vec![
                ColumnDef {
                    name: "id".into(),
                    col_type: ColumnType::Integer,
                    nullable: false,
                },
                ColumnDef {
                    name: "age".into(),
                    col_type: ColumnType::Integer,
                    nullable: false,
                },
            ],
            primary_key: vec!["id".into()],
            constraints: Vec::new(),
            foreign_keys: Vec::new(),
        };
        let row = Row::from_values(vec![Value::Integer(1), Value::Integer(42)]);
        let key = extract_index_key(&row, &schema, &["age".into()]).expect("extract");
        assert_eq!(key, vec![Value::Integer(42)]);
        let encoded = extract_index_key_encoded(&row, &schema, &["age".into()]).expect("encoded");
        assert_eq!(encoded, EncodedKey::from_values(&[Value::Integer(42)]));
    }
}