#![allow(dead_code)]
use std::sync::Arc;
use redb::ReadableTable;
use znippy_zoomies::stree::STree64;
use crate::error::RedbCatalogError;
use crate::store::COMMITS;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Entry {
pub(crate) table_key: Box<str>,
pub(crate) metadata_location: Box<str>,
}
pub(crate) struct StaticIndex {
min_id: i64,
max_id: i64,
tree: STree64,
ids: Arc<[i64]>,
entries: Arc<[Entry]>,
}
impl std::fmt::Debug for StaticIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StaticIndex")
.field("min_id", &self.min_id)
.field("max_id", &self.max_id)
.field("len", &self.entries.len())
.finish()
}
}
impl StaticIndex {
pub(crate) fn build(read: &redb::ReadTransaction) -> Result<Option<Self>, RedbCatalogError> {
let tbl = read.open_table(COMMITS)?;
let mut rows: Vec<(i64, Entry)> = Vec::new();
for row in tbl.iter()? {
let (k, v) = row?;
let key = k.value();
if let Some((sid, table_key)) = decode_commit_key(key) {
rows.push((
sid,
Entry {
table_key: table_key.into(),
metadata_location: v.value().into(),
},
));
}
}
Ok(Self::from_rows(rows))
}
fn from_rows(mut rows: Vec<(i64, Entry)>) -> Option<Self> {
if rows.is_empty() {
return None;
}
rows.sort_by_key(|(sid, _)| *sid);
rows.dedup_by_key(|(sid, _)| *sid);
let ids: Vec<i64> = rows.iter().map(|(s, _)| *s).collect();
let entries: Vec<Entry> = rows.into_iter().map(|(_, e)| e).collect();
let min_id = ids[0];
let max_id = *ids.last().unwrap();
let tree = STree64::new(&ids);
Some(Self {
min_id,
max_id,
tree,
ids: ids.into(),
entries: entries.into(),
})
}
pub(crate) fn min_id(&self) -> i64 {
self.min_id
}
pub(crate) fn max_id(&self) -> i64 {
self.max_id
}
pub(crate) fn len(&self) -> usize {
self.entries.len()
}
pub(crate) fn lookup(&self, snapshot_id: i64) -> Option<&Entry> {
let idx = self.tree.find_exact(snapshot_id)?;
debug_assert_eq!(self.ids.get(idx).copied(), Some(snapshot_id));
self.entries.get(idx)
}
pub(crate) fn lookup_many(&self, snapshot_ids: &[i64]) -> Vec<Option<&Entry>> {
self.tree
.batch_stream::<16>(snapshot_ids)
.into_iter()
.zip(snapshot_ids)
.map(|(hit, &q)| {
hit.and_then(|i| {
if self.ids.get(i).copied() == Some(q) {
self.entries.get(i)
} else {
None
}
})
})
.collect()
}
}
fn decode_commit_key(key: &[u8]) -> Option<(i64, String)> {
let n = key.len();
if n < 9 {
return None;
}
let mut be = [0u8; 8];
be.copy_from_slice(&key[n - 8..]);
let sid = u64::from_be_bytes(be) as i64;
let table_key = std::str::from_utf8(&key[..n - 9]).ok()?.to_string();
Some((sid, table_key))
}
#[cfg(test)]
mod tests {
use super::*;
fn entry(loc: &str) -> Entry {
Entry {
table_key: "cat\x1fns\x1ft".into(),
metadata_location: loc.into(),
}
}
#[test]
fn decode_round_trips_commit_key() {
let k = crate::store::commit_key("cat\x1fns\x1ft", 123456789);
let (sid, tk) = decode_commit_key(&k).unwrap();
assert_eq!(sid, 123456789);
assert_eq!(tk, "cat\x1fns\x1ft");
}
#[test]
fn empty_log_builds_nothing() {
assert!(StaticIndex::from_rows(vec![]).is_none());
}
#[test]
fn lookup_hits_and_misses() {
let rows = vec![(300, entry("c")), (100, entry("a")), (200, entry("b"))];
let idx = StaticIndex::from_rows(rows).unwrap();
assert_eq!(idx.min_id(), 100);
assert_eq!(idx.max_id(), 300);
assert_eq!(idx.len(), 3);
assert_eq!(idx.lookup(100).unwrap().metadata_location.as_ref(), "a");
assert_eq!(idx.lookup(200).unwrap().metadata_location.as_ref(), "b");
assert_eq!(idx.lookup(300).unwrap().metadata_location.as_ref(), "c");
assert!(idx.lookup(150).is_none());
assert!(idx.lookup(400).is_none());
}
#[test]
fn lookup_many_batches_hits_and_misses_in_order() {
let rows: Vec<(i64, Entry)> = (0..20)
.map(|i| (i * 10, entry(&format!("loc-{i}"))))
.collect();
let idx = StaticIndex::from_rows(rows).unwrap();
let queries = [0, 5, 190, 100, 999, 30, 195, 10];
let got = idx.lookup_many(&queries);
let locs: Vec<Option<&str>> = got
.iter()
.map(|e| e.map(|x| x.metadata_location.as_ref()))
.collect();
assert_eq!(
locs,
vec![
Some("loc-0"), None, Some("loc-19"), Some("loc-10"), None, Some("loc-3"), None, Some("loc-1"), ]
);
for q in [0i64, 5, 50, 190, 195, 200] {
let single = idx.lookup(q).map(|e| e.metadata_location.as_ref());
let batched = idx.lookup_many(&[q])[0].map(|e| e.metadata_location.as_ref());
assert_eq!(single, batched, "mismatch at {q}");
}
}
#[test]
fn builds_from_a_real_commit_log() {
use crate::store::{COMMITS, META, record_commit};
let dir = tempfile::tempdir().unwrap();
let db = redb::Database::create(dir.path().join("c.redb")).unwrap();
{
let w = db.begin_write().unwrap();
{
let _ = w.open_table(COMMITS);
let _ = w.open_table(META);
}
w.commit().unwrap();
}
{
let w = db.begin_write().unwrap();
record_commit(&w, "cat\x1fns\x1ft", Some(42), "loc-42").unwrap();
record_commit(&w, "cat\x1fns\x1fu", Some(7), "loc-7").unwrap();
record_commit(&w, "cat\x1fns\x1ft", None, "ignored").unwrap();
w.commit().unwrap();
}
let read = db.begin_read().unwrap();
let idx = StaticIndex::build(&read).unwrap().unwrap();
assert_eq!(idx.len(), 2, "the snapshot-less commit adds no index entry");
assert_eq!(idx.lookup(42).unwrap().metadata_location.as_ref(), "loc-42");
assert_eq!(idx.lookup(42).unwrap().table_key.as_ref(), "cat\x1fns\x1ft");
assert_eq!(idx.lookup(7).unwrap().metadata_location.as_ref(), "loc-7");
assert_eq!(idx.lookup(7).unwrap().table_key.as_ref(), "cat\x1fns\x1fu");
assert_eq!(idx.min_id(), 7);
assert_eq!(idx.max_id(), 42);
}
}