use tracing::debug;
use super::btree::{DOCUMENTS, INDEXES, SparseEngine, redb_err};
impl SparseEngine {
pub fn scan_documents(
&self,
tenant_id: u32,
collection: &str,
limit: usize,
) -> crate::Result<Vec<(String, Vec<u8>)>> {
let prefix = format!("{tenant_id}:{collection}:");
let end = format!("{tenant_id}:{collection}:\u{ffff}");
let read_txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
let table = read_txn
.open_table(DOCUMENTS)
.map_err(|e| redb_err("open table", e))?;
let range = table
.range(prefix.as_str()..end.as_str())
.map_err(|e| redb_err("doc range", e))?;
let mut results = Vec::with_capacity(limit.min(256));
for entry in range {
if results.len() >= limit {
break;
}
let entry = entry.map_err(|e| redb_err("doc entry", e))?;
let key = entry.0.value().to_string();
let doc_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
let value = entry.1.value().to_vec();
results.push((doc_id, value));
}
debug!(collection, count = results.len(), "document scan");
Ok(results)
}
pub fn scan_index_groups(
&self,
tenant_id: u32,
collection: &str,
field: &str,
) -> crate::Result<Vec<(String, usize)>> {
let prefix = format!("{tenant_id}:{collection}:{field}:");
let end = format!("{tenant_id}:{collection}:{field}:\u{ffff}");
let read_txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
let table = read_txn
.open_table(INDEXES)
.map_err(|e| redb_err("open table", e))?;
let range = table
.range(prefix.as_str()..end.as_str())
.map_err(|e| redb_err("index range", e))?;
let mut groups: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
for entry in range {
let entry = entry.map_err(|e| redb_err("index entry", e))?;
let key = entry.0.value().to_string();
if let Some(rest) = key.strip_prefix(&prefix)
&& let Some(colon_pos) = rest.rfind(':')
{
let value = &rest[..colon_pos];
*groups.entry(value.to_string()).or_default() += 1;
}
}
let mut result: Vec<(String, usize)> = groups.into_iter().collect();
result.sort_by(|a, b| a.0.cmp(&b.0));
debug!(collection, field, groups = result.len(), "index group scan");
Ok(result)
}
pub fn scan_documents_filtered(
&self,
tenant_id: u32,
collection: &str,
limit: usize,
predicate: &dyn Fn(&[u8]) -> bool,
) -> crate::Result<Vec<(String, Vec<u8>)>> {
let prefix = format!("{tenant_id}:{collection}:");
let end = format!("{tenant_id}:{collection}:\u{ffff}");
let read_txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
let table = read_txn
.open_table(DOCUMENTS)
.map_err(|e| redb_err("open table", e))?;
let range = table
.range(prefix.as_str()..end.as_str())
.map_err(|e| redb_err("doc range", e))?;
let mut results = Vec::with_capacity(limit.min(256));
for entry in range {
if results.len() >= limit {
break;
}
let entry = entry.map_err(|e| redb_err("doc entry", e))?;
let value_bytes = entry.1.value();
if !predicate(value_bytes) {
continue;
}
let key = entry.0.value().to_string();
let doc_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
results.push((doc_id, value_bytes.to_vec()));
}
debug!(collection, count = results.len(), "filtered document scan");
Ok(results)
}
pub fn export_documents(&self) -> crate::Result<Vec<(String, Vec<u8>)>> {
let txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
let table = txn
.open_table(DOCUMENTS)
.map_err(|e| redb_err("open docs", e))?;
let mut pairs = Vec::new();
let iter = table
.range::<&str>(..)
.map_err(|e| redb_err("iter docs", e))?;
for entry in iter {
let entry = entry.map_err(|e| redb_err("read doc entry", e))?;
pairs.push((entry.0.value().to_string(), entry.1.value().to_vec()));
}
Ok(pairs)
}
pub fn export_indexes(&self) -> crate::Result<Vec<(String, Vec<u8>)>> {
let txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
let table = txn
.open_table(INDEXES)
.map_err(|e| redb_err("open indexes", e))?;
let mut pairs = Vec::new();
let iter = table
.range::<&str>(..)
.map_err(|e| redb_err("iter indexes", e))?;
for entry in iter {
let entry = entry.map_err(|e| redb_err("read index entry", e))?;
pairs.push((entry.0.value().to_string(), entry.1.value().to_vec()));
}
Ok(pairs)
}
pub fn import_documents(&self, pairs: &[(String, Vec<u8>)]) -> crate::Result<()> {
let txn = self
.db
.begin_write()
.map_err(|e| redb_err("write txn", e))?;
{
let mut table = txn
.open_table(DOCUMENTS)
.map_err(|e| redb_err("open docs", e))?;
for (key, value) in pairs {
table
.insert(key.as_str(), value.as_slice())
.map_err(|e| redb_err("insert doc", e))?;
}
}
txn.commit().map_err(|e| redb_err("commit", e))?;
Ok(())
}
pub fn import_indexes(&self, pairs: &[(String, Vec<u8>)]) -> crate::Result<()> {
let txn = self
.db
.begin_write()
.map_err(|e| redb_err("write txn", e))?;
{
let mut table = txn
.open_table(INDEXES)
.map_err(|e| redb_err("open indexes", e))?;
for (key, value) in pairs {
table
.insert(key.as_str(), value.as_slice())
.map_err(|e| redb_err("insert idx", e))?;
}
}
txn.commit().map_err(|e| redb_err("commit", e))?;
Ok(())
}
}