use std::collections::BTreeMap;
use crate::error::{Result, SQLRiteError};
use crate::sql::db::table::{DataType, Value};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexOrigin {
Auto,
Explicit,
}
#[derive(Debug, Clone)]
pub struct SecondaryIndex {
pub name: String,
pub table_name: String,
pub column_name: String,
pub is_unique: bool,
pub origin: IndexOrigin,
pub entries: IndexEntries,
}
#[derive(Debug, Clone)]
pub enum IndexEntries {
Integer(BTreeMap<i64, Vec<i64>>),
Text(BTreeMap<String, Vec<i64>>),
}
impl SecondaryIndex {
pub fn new(
name: String,
table_name: String,
column_name: String,
datatype: &DataType,
is_unique: bool,
origin: IndexOrigin,
) -> Result<Self> {
let entries = match datatype {
DataType::Integer => IndexEntries::Integer(BTreeMap::new()),
DataType::Text => IndexEntries::Text(BTreeMap::new()),
other => {
return Err(SQLRiteError::General(format!(
"cannot build a secondary index on a {other} column"
)));
}
};
Ok(Self {
name,
table_name,
column_name,
is_unique,
origin,
entries,
})
}
#[allow(dead_code)]
pub fn synthesized_sql(&self) -> String {
let unique = if self.is_unique { "UNIQUE " } else { "" };
format!(
"CREATE {unique}INDEX {} ON {} ({});",
self.name, self.table_name, self.column_name
)
}
pub fn auto_name(table_name: &str, column_name: &str) -> String {
format!("sqlrite_autoindex_{table_name}_{column_name}")
}
pub fn would_violate_unique(&self, value: &Value) -> bool {
if !self.is_unique {
return false;
}
match (&self.entries, value) {
(IndexEntries::Integer(m), Value::Integer(v)) => m.contains_key(v),
(IndexEntries::Text(m), Value::Text(s)) => m.contains_key(s),
_ => false, }
}
pub fn insert(&mut self, value: &Value, rowid: i64) -> Result<()> {
match (&mut self.entries, value) {
(_, Value::Null) => Ok(()),
(IndexEntries::Integer(m), Value::Integer(v)) => {
m.entry(*v).or_default().push(rowid);
Ok(())
}
(IndexEntries::Text(m), Value::Text(s)) => {
m.entry(s.clone()).or_default().push(rowid);
Ok(())
}
(entries, value) => Err(SQLRiteError::Internal(format!(
"type mismatch inserting into index '{}': entries={entries:?}, value={value:?}",
self.name
))),
}
}
pub fn remove(&mut self, value: &Value, rowid: i64) {
match (&mut self.entries, value) {
(IndexEntries::Integer(m), Value::Integer(v)) => {
if let Some(list) = m.get_mut(v) {
list.retain(|r| *r != rowid);
if list.is_empty() {
m.remove(v);
}
}
}
(IndexEntries::Text(m), Value::Text(s)) => {
if let Some(list) = m.get_mut(s) {
list.retain(|r| *r != rowid);
if list.is_empty() {
m.remove(s);
}
}
}
_ => {}
}
}
pub fn lookup(&self, value: &Value) -> Vec<i64> {
match (&self.entries, value) {
(IndexEntries::Integer(m), Value::Integer(v)) => m.get(v).cloned().unwrap_or_default(),
(IndexEntries::Text(m), Value::Text(s)) => m.get(s).cloned().unwrap_or_default(),
_ => Vec::new(),
}
}
#[allow(dead_code)]
pub fn iter_entries(&self) -> Box<dyn Iterator<Item = (Value, i64)> + '_> {
match &self.entries {
IndexEntries::Integer(m) => Box::new(
m.iter()
.flat_map(|(v, rs)| rs.iter().map(|r| (Value::Integer(*v), *r))),
),
IndexEntries::Text(m) => Box::new(
m.iter()
.flat_map(|(v, rs)| rs.iter().map(|r| (Value::Text(v.clone()), *r))),
),
}
}
}
impl PartialEq for SecondaryIndex {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.table_name == other.table_name
&& self.column_name == other.column_name
&& self.is_unique == other.is_unique
&& self.origin == other.origin
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn auto_name_is_deterministic() {
assert_eq!(
SecondaryIndex::auto_name("users", "email"),
"sqlrite_autoindex_users_email"
);
}
#[test]
fn rejects_index_on_real_column() {
let err = SecondaryIndex::new(
"x".into(),
"t".into(),
"c".into(),
&DataType::Real,
false,
IndexOrigin::Explicit,
)
.unwrap_err();
assert!(format!("{err}").contains("cannot build"));
}
#[test]
fn unique_violation_detected_before_insert() {
let mut idx = SecondaryIndex::new(
"x".into(),
"t".into(),
"c".into(),
&DataType::Integer,
true,
IndexOrigin::Auto,
)
.unwrap();
assert!(!idx.would_violate_unique(&Value::Integer(1)));
idx.insert(&Value::Integer(1), 100).unwrap();
assert!(idx.would_violate_unique(&Value::Integer(1)));
assert!(!idx.would_violate_unique(&Value::Integer(2)));
}
#[test]
fn null_is_not_indexed_and_never_conflicts() {
let mut idx = SecondaryIndex::new(
"x".into(),
"t".into(),
"c".into(),
&DataType::Text,
true,
IndexOrigin::Auto,
)
.unwrap();
idx.insert(&Value::Null, 1).unwrap();
idx.insert(&Value::Null, 2).unwrap();
assert_eq!(idx.lookup(&Value::Null), Vec::<i64>::new());
assert!(!idx.would_violate_unique(&Value::Null));
}
#[test]
fn insert_and_remove_preserve_list_semantics() {
let mut idx = SecondaryIndex::new(
"x".into(),
"t".into(),
"c".into(),
&DataType::Text,
false,
IndexOrigin::Explicit,
)
.unwrap();
idx.insert(&Value::Text("a".into()), 1).unwrap();
idx.insert(&Value::Text("a".into()), 2).unwrap();
idx.insert(&Value::Text("a".into()), 3).unwrap();
assert_eq!(idx.lookup(&Value::Text("a".into())), vec![1, 2, 3]);
idx.remove(&Value::Text("a".into()), 2);
assert_eq!(idx.lookup(&Value::Text("a".into())), vec![1, 3]);
idx.remove(&Value::Text("a".into()), 1);
idx.remove(&Value::Text("a".into()), 3);
assert_eq!(idx.lookup(&Value::Text("a".into())), Vec::<i64>::new());
}
#[test]
fn iter_entries_yields_value_rowid_pairs_in_order() {
let mut idx = SecondaryIndex::new(
"x".into(),
"t".into(),
"c".into(),
&DataType::Integer,
false,
IndexOrigin::Explicit,
)
.unwrap();
idx.insert(&Value::Integer(20), 200).unwrap();
idx.insert(&Value::Integer(10), 100).unwrap();
idx.insert(&Value::Integer(10), 101).unwrap();
let pairs: Vec<(Value, i64)> = idx.iter_entries().collect();
assert_eq!(
pairs,
vec![
(Value::Integer(10), 100),
(Value::Integer(10), 101),
(Value::Integer(20), 200),
]
);
}
#[test]
fn synthesized_sql_round_trips_through_parser() {
let idx = SecondaryIndex::new(
"sqlrite_autoindex_users_name".into(),
"users".into(),
"name".into(),
&DataType::Text,
true,
IndexOrigin::Auto,
)
.unwrap();
let sql = idx.synthesized_sql();
assert_eq!(
sql,
"CREATE UNIQUE INDEX sqlrite_autoindex_users_name ON users (name);"
);
}
}