use indexmap::IndexMap;
use redb::ReadableTable;
use std::collections::HashSet;
use vantage_core::{Result, error};
use vantage_table::table::Table;
use vantage_types::{Entity, Record};
use crate::condition::RedbCondition;
use crate::redb::helpers::{id_column_name, indexed_columns, paginate};
use crate::redb::{Redb, index_table_def, index_table_name, main_table_def};
use crate::types::{AnyRedbType, decode_record, value_to_index_key};
pub(crate) async fn load_filtered<E>(
db: &Redb,
table: &Table<Redb, E>,
) -> Result<IndexMap<String, Record<AnyRedbType>>>
where
E: Entity<AnyRedbType>,
{
let table_name = table.table_name();
let id_col = id_column_name(table);
let indexed = indexed_columns(table);
let conditions: Vec<RedbCondition> =
futures_util::future::try_join_all(table.conditions().map(|c| c.clone().resolve())).await?;
let mut out: IndexMap<String, Record<AnyRedbType>> = IndexMap::new();
if conditions.is_empty() {
let txn = db.begin_read()?;
let main = match txn.open_table(main_table_def(table_name)) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(out),
Err(e) => {
return Err(error!(
"Failed to open table for scan",
details = e.to_string()
));
}
};
let iter = main
.iter()
.map_err(|e| error!("Failed to iterate redb table", details = e.to_string()))?;
for entry in iter {
let (k, v) =
entry.map_err(|e| error!("Failed to read redb row", details = e.to_string()))?;
out.insert(k.value().to_string(), decode_record(v.value())?);
}
return Ok(paginate(table, out));
}
let (first, rest) = conditions.split_first().unwrap();
let candidates = candidates_for(db, table_name, &id_col, &indexed, first)?;
let txn = db.begin_read()?;
let main = match txn.open_table(main_table_def(table_name)) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(out),
Err(e) => {
return Err(error!(
"Failed to open table for verify",
details = e.to_string()
));
}
};
for id in candidates {
let row_bytes = match main
.get(id.as_str())
.map_err(|e| error!("Failed to read row by id", details = e.to_string()))?
{
Some(b) => b,
None => continue, };
let record = decode_record(row_bytes.value())?;
if rest
.iter()
.all(|c| condition_matches(c, &record, &id_col, &id))
{
out.insert(id, record);
}
}
Ok(paginate(table, out))
}
fn candidates_for(
db: &Redb,
table_name: &str,
id_col: &str,
indexed: &HashSet<String>,
cond: &RedbCondition,
) -> Result<Vec<String>> {
match cond {
RedbCondition::Eq { column, value } => {
if column == id_col {
return id_lookup(db, table_name, std::slice::from_ref(value));
}
require_indexed(indexed, column);
scan_index(db, table_name, column, value)
}
RedbCondition::In { column, values } => {
if column == id_col {
return id_lookup(db, table_name, values);
}
require_indexed(indexed, column);
let mut out = Vec::new();
let mut seen: HashSet<String> = HashSet::new();
for v in values {
for id in scan_index(db, table_name, column, v)? {
if seen.insert(id.clone()) {
out.push(id);
}
}
}
Ok(out)
}
RedbCondition::Deferred(_) => {
unreachable!("deferred condition reached candidates_for after resolve()")
}
}
}
fn require_indexed(indexed: &HashSet<String>, column: &str) {
if !indexed.contains(column) {
panic!(
"vantage-redb: condition on non-indexed column `{}` — flag the column with ColumnFlag::Indexed",
column
);
}
}
fn id_lookup(db: &Redb, table_name: &str, ids: &[AnyRedbType]) -> Result<Vec<String>> {
let txn = db.begin_read()?;
let main = match txn.open_table(main_table_def(table_name)) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
Err(e) => {
return Err(error!(
"Failed to open main for id lookup",
details = e.to_string()
));
}
};
let mut out = Vec::with_capacity(ids.len());
for v in ids {
let id_str = match v.try_get::<String>() {
Some(s) => s,
None => continue,
};
if main
.get(id_str.as_str())
.map_err(|e| error!("Main get failed", details = e.to_string()))?
.is_some()
{
out.push(id_str);
}
}
Ok(out)
}
fn scan_index(
db: &Redb,
table_name: &str,
column: &str,
value: &AnyRedbType,
) -> Result<Vec<String>> {
let key_bytes = value_to_index_key(value)?;
let idx_name = index_table_name(table_name, column);
let txn = db.begin_read()?;
let idx = match txn.open_table(index_table_def(&idx_name)) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
Err(e) => {
return Err(error!(
"Failed to open index table",
table = idx_name,
details = e.to_string()
));
}
};
let lo = (key_bytes.as_slice(), "");
let hi_id = "\u{10FFFF}".repeat(2);
let hi = (key_bytes.as_slice(), hi_id.as_str());
let iter = idx
.range(lo..=hi)
.map_err(|e| error!("Index range scan failed", details = e.to_string()))?;
let mut ids = Vec::new();
for entry in iter {
let (k, _) =
entry.map_err(|e| error!("Index iteration failed", details = e.to_string()))?;
let (vbytes, id) = k.value();
if vbytes != key_bytes.as_slice() {
break;
}
ids.push(id.to_string());
}
Ok(ids)
}
fn condition_matches(
cond: &RedbCondition,
record: &Record<AnyRedbType>,
id_col: &str,
id: &str,
) -> bool {
match cond {
RedbCondition::Eq { column, value } => {
if column == id_col {
value.try_get::<String>().as_deref() == Some(id)
} else {
record
.get(column.as_str())
.map(|v| v.value() == value.value())
.unwrap_or(false)
}
}
RedbCondition::In { column, values } => {
if column == id_col {
values
.iter()
.any(|v| v.try_get::<String>().as_deref() == Some(id))
} else if let Some(field) = record.get(column.as_str()) {
values.iter().any(|v| v.value() == field.value())
} else {
false
}
}
RedbCondition::Deferred(_) => false,
}
}