use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use arc_swap::ArcSwapOption;
use iceberg::{Error, ErrorKind};
use redb::{Database, ReadableTable, TableDefinition};
use tokio::sync::{Mutex, oneshot};
use crate::pointer_cache::PointerCache;
use crate::static_index::StaticIndex;
#[derive(Default)]
pub(crate) struct CommitOutcome {
pub(crate) mirror_insert: Option<(String, String)>,
pub(crate) mirror_remove: Option<String>,
}
impl CommitOutcome {
pub(crate) fn insert(key: impl Into<String>, loc: impl Into<String>) -> Self {
Self {
mirror_insert: Some((key.into(), loc.into())),
mirror_remove: None,
}
}
}
pub(crate) type CommitFn =
Box<dyn FnOnce(&redb::WriteTransaction) -> iceberg::Result<CommitOutcome> + Send>;
struct PendingCommit {
apply: CommitFn,
done: oneshot::Sender<iceberg::Result<CommitOutcome>>,
}
impl std::fmt::Debug for PendingCommit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("PendingCommit")
}
}
pub(crate) const NAMESPACES: TableDefinition<&str, &[u8]> = TableDefinition::new("namespaces");
pub(crate) const NAMESPACE_PROPS: TableDefinition<&str, &str> =
TableDefinition::new("namespace_props");
pub(crate) const TABLES: TableDefinition<&str, &str> = TableDefinition::new("tables");
pub(crate) const COMMITS: TableDefinition<&[u8], &str> = TableDefinition::new("commits");
pub(crate) const META: TableDefinition<&str, u64> = TableDefinition::new("meta");
pub(crate) const META_COMMIT_SEQ: &str = "commit_seq";
pub(crate) const META_UPDATE_COUNTER: &str = "update_counter";
#[allow(dead_code)] pub(crate) const META_LIVE_TAIL_MISSES: &str = "live_tail_misses";
pub(crate) const COMPACT_THRESHOLD: u64 = 1024;
pub(crate) fn commit_key(table_key: &str, snapshot_id: i64) -> Vec<u8> {
let mut k = Vec::with_capacity(table_key.len() + 1 + 8);
k.extend_from_slice(table_key.as_bytes());
k.push(b'\x1f');
k.extend_from_slice(&(snapshot_id as u64).to_be_bytes());
k
}
pub(crate) fn record_commit(
write: &redb::WriteTransaction,
table_key: &str,
snapshot_id: Option<i64>,
metadata_location: &str,
) -> Result<u64, crate::error::RedbCatalogError> {
if let Some(sid) = snapshot_id {
let mut commits = write.open_table(COMMITS)?;
commits.insert(commit_key(table_key, sid).as_slice(), metadata_location)?;
}
let mut meta = write.open_table(META)?;
let seq = meta.get(META_COMMIT_SEQ)?.map(|v| v.value()).unwrap_or(0) + 1;
meta.insert(META_COMMIT_SEQ, seq)?;
let upd = meta
.get(META_UPDATE_COUNTER)?
.map(|v| v.value())
.unwrap_or(0)
+ 1;
meta.insert(META_UPDATE_COUNTER, upd)?;
Ok(seq)
}
#[derive(Debug, Clone)]
pub(crate) struct Store {
pub(crate) path: PathBuf,
pub(crate) db: Arc<Mutex<Database>>,
pub(crate) pointers: PointerCache,
pub(crate) durability: redb::Durability,
pub(crate) static_index: Arc<ArcSwapOption<StaticIndex>>,
pub(crate) cutoff: Arc<AtomicI64>,
pub(crate) compacting: Arc<AtomicBool>,
pub(crate) writes_since_compaction: Arc<AtomicU64>,
commit_queue: Arc<StdMutex<VecDeque<PendingCommit>>>,
}
impl Store {
pub(crate) fn open(
path: PathBuf,
durability: redb::Durability,
) -> Result<Self, redb::DatabaseError> {
let db = Database::create(&path)?;
let write = db.begin_write().map_err(|e| match e {
redb::TransactionError::Storage(s) => redb::DatabaseError::Storage(s),
other => redb::DatabaseError::Storage(redb::StorageError::Io(std::io::Error::other(
other.to_string(),
))),
})?;
{
let _ = write.open_table(NAMESPACES);
let _ = write.open_table(NAMESPACE_PROPS);
let _ = write.open_table(TABLES);
let _ = write.open_table(COMMITS);
let _ = write.open_table(META);
}
write.commit().map_err(|e| {
redb::DatabaseError::Storage(redb::StorageError::Io(std::io::Error::other(
e.to_string(),
)))
})?;
let pointers = {
let read = db.begin_read().map_err(other_db_err)?;
let tbl = read.open_table(TABLES).map_err(other_db_err)?;
let mut entries: Vec<(String, String)> = Vec::new();
for row in tbl.iter().map_err(other_db_err)? {
let (k, v) = row.map_err(other_db_err)?;
entries.push((k.value().to_string(), v.value().to_string()));
}
PointerCache::from_entries(entries)
};
let (static_index, cutoff, pending) = {
let read = db.begin_read().map_err(other_db_err)?;
let idx = StaticIndex::build(&read).map_err(other_db_err)?;
let cutoff = idx.as_ref().map(|i| i.max_id()).unwrap_or(i64::MIN);
let pending = read
.open_table(META)
.ok()
.and_then(|m| m.get(META_UPDATE_COUNTER).ok().flatten().map(|v| v.value()))
.unwrap_or(0);
(idx.map(Arc::new), cutoff, pending)
};
Ok(Self {
path,
db: Arc::new(Mutex::new(db)),
pointers,
durability,
static_index: Arc::new(ArcSwapOption::from(static_index)),
cutoff: Arc::new(AtomicI64::new(cutoff)),
compacting: Arc::new(AtomicBool::new(false)),
writes_since_compaction: Arc::new(AtomicU64::new(pending)),
commit_queue: Arc::new(StdMutex::new(VecDeque::new())),
})
}
pub(crate) async fn group_commit(&self, apply: CommitFn) -> iceberg::Result<CommitOutcome> {
let (tx, rx) = oneshot::channel();
self.commit_queue
.lock()
.unwrap()
.push_back(PendingCommit { apply, done: tx });
{
let db = self.db.lock().await;
let batch: Vec<PendingCommit> = self.commit_queue.lock().unwrap().drain(..).collect();
if !batch.is_empty() {
let mut committed: Vec<(
oneshot::Sender<iceberg::Result<CommitOutcome>>,
iceberg::Result<CommitOutcome>,
)> = Vec::with_capacity(batch.len());
let mut applied = 0u64;
match db.begin_write() {
Ok(mut write) => {
write.set_durability(self.durability);
for PendingCommit { apply, done } in batch {
let r = apply(&write);
if r.is_ok() {
applied += 1;
}
committed.push((done, r));
}
match write.commit() {
Ok(()) => {
for (done, r) in committed {
if let Ok(out) = &r {
if let Some((k, loc)) = &out.mirror_insert {
self.pointers.insert(k, loc);
}
if let Some(k) = &out.mirror_remove {
self.pointers.remove(k);
}
}
let _ = done.send(r);
}
for _ in 0..applied {
self.maybe_trigger_compaction();
}
}
Err(e) => {
let msg = format!("group commit failed: {e}");
for (done, _) in committed {
let _ = done
.send(Err(Error::new(ErrorKind::Unexpected, msg.clone())));
}
}
}
}
Err(e) => {
let msg = format!("begin_write failed: {e}");
for PendingCommit { apply: _, done } in batch {
let _ = done.send(Err(Error::new(ErrorKind::Unexpected, msg.clone())));
}
}
}
}
}
rx.await.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"group commit worker dropped the result",
)
})?
}
pub(crate) fn compact_with_db(
&self,
db: &Database,
) -> Result<usize, crate::error::RedbCatalogError> {
let idx = {
let read = db.begin_read()?;
StaticIndex::build(&read)?
};
let count = idx.as_ref().map(|i| i.len()).unwrap_or(0);
let max = idx.as_ref().map(|i| i.max_id()).unwrap_or(i64::MIN);
self.static_index.store(idx.map(Arc::new));
self.cutoff.store(max, Ordering::Release);
{
let mut w = db.begin_write()?;
w.set_durability(self.durability);
{
let mut meta = w.open_table(META)?;
meta.insert(META_UPDATE_COUNTER, 0u64)?;
}
w.commit()?;
}
self.writes_since_compaction.store(0, Ordering::Release);
Ok(count)
}
pub(crate) fn maybe_trigger_compaction(&self) {
let n = self.writes_since_compaction.fetch_add(1, Ordering::AcqRel) + 1;
if n < COMPACT_THRESHOLD {
return;
}
if self
.compacting
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return; }
let me = self.clone();
std::thread::spawn(move || {
let db = me.db.blocking_lock();
let _ = me.compact_with_db(&db);
me.compacting.store(false, Ordering::Release);
});
}
}
fn other_db_err(e: impl std::fmt::Display) -> redb::DatabaseError {
redb::DatabaseError::Storage(redb::StorageError::Io(std::io::Error::other(e.to_string())))
}
#[cfg(test)]
mod tests {
use super::*;
use redb::Database;
#[test]
fn commit_key_orders_ascending_by_snapshot() {
let tk = "cat\x1fns\x1ft";
assert!(commit_key(tk, 1) < commit_key(tk, 2));
assert!(commit_key(tk, 2) < commit_key(tk, 1_000_000));
assert_ne!(
commit_key("cat\x1fns\x1fa", 1),
commit_key("cat\x1fns\x1fb", 1)
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn group_commit_lands_all_concurrent_commits() {
let dir = tempfile::tempdir().unwrap();
let store = Store::open(dir.path().join("c.redb"), redb::Durability::Immediate).unwrap();
let n = 64usize;
let mut handles = Vec::new();
for i in 0..n {
let s = store.clone();
handles.push(tokio::spawn(async move {
let key = format!("cat\x1fns\x1ft{i:03}");
let loc = format!("s3://wh/{i:03}.metadata.json");
let (k, l) = (key.clone(), loc.clone());
s.group_commit(Box::new(move |w| {
let mut t = w
.open_table(TABLES)
.map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?;
t.insert(k.as_str(), l.as_str())
.map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?;
Ok(CommitOutcome::insert(k, l))
}))
.await
.unwrap();
}));
}
for h in handles {
h.await.unwrap();
}
for i in 0..n {
let key = format!("cat\x1fns\x1ft{i:03}");
assert!(store.pointers.get(&key).is_some(), "mirror missing {key}");
}
let db = store.db.lock().await;
let read = db.begin_read().unwrap();
let t = read.open_table(TABLES).unwrap();
for i in 0..n {
let key = format!("cat\x1fns\x1ft{i:03}");
assert!(t.get(key.as_str()).unwrap().is_some(), "redb missing {key}");
}
}
#[test]
fn record_commit_appends_log_and_bumps_counters() {
let dir = tempfile::tempdir().unwrap();
let db = Database::create(dir.path().join("c.redb")).unwrap();
{
let w = db.begin_write().unwrap();
{
let _ = w.open_table(COMMITS);
let _ = w.open_table(META);
}
w.commit().unwrap();
}
let tk = "cat\x1fns\x1ft";
{
let w = db.begin_write().unwrap();
let s1 = record_commit(&w, tk, Some(10), "loc-a").unwrap();
let s2 = record_commit(&w, tk, None, "loc-b").unwrap();
let s3 = record_commit(&w, tk, Some(20), "loc-c").unwrap();
assert_eq!((s1, s2, s3), (1, 2, 3));
w.commit().unwrap();
}
let r = db.begin_read().unwrap();
let commits = r.open_table(COMMITS).unwrap();
assert_eq!(
commits
.get(commit_key(tk, 10).as_slice())
.unwrap()
.unwrap()
.value(),
"loc-a"
);
assert_eq!(
commits
.get(commit_key(tk, 20).as_slice())
.unwrap()
.unwrap()
.value(),
"loc-c"
);
assert!(commits.get(commit_key(tk, 0).as_slice()).unwrap().is_none());
let meta = r.open_table(META).unwrap();
assert_eq!(meta.get(META_COMMIT_SEQ).unwrap().unwrap().value(), 3);
assert_eq!(meta.get(META_UPDATE_COUNTER).unwrap().unwrap().value(), 3);
}
fn record(db: &Database, tk: &str, sid: i64, loc: &str) {
let w = db.begin_write().unwrap();
record_commit(&w, tk, Some(sid), loc).unwrap();
w.commit().unwrap();
}
#[test]
fn compaction_builds_swaps_and_resets_counter() {
let dir = tempfile::tempdir().unwrap();
let store = Store::open(dir.path().join("c.redb"), redb::Durability::Immediate).unwrap();
let tk = "cat\x1fns\x1ft";
{
let db = store.db.blocking_lock();
record(&db, tk, 100, "loc-100");
record(&db, tk, 200, "loc-200");
}
assert!(store.static_index.load().is_none());
assert_eq!(store.cutoff.load(Ordering::Acquire), i64::MIN);
let n = {
let db = store.db.blocking_lock();
store.compact_with_db(&db).unwrap()
};
assert_eq!(n, 2);
let idx = store.static_index.load_full().expect("index swapped in");
assert_eq!(
idx.lookup(100).unwrap().metadata_location.as_ref(),
"loc-100"
);
assert_eq!(
idx.lookup(200).unwrap().metadata_location.as_ref(),
"loc-200"
);
assert_eq!(store.cutoff.load(Ordering::Acquire), 200);
assert_eq!(store.writes_since_compaction.load(Ordering::Acquire), 0);
let db = store.db.blocking_lock();
let r = db.begin_read().unwrap();
let meta = r.open_table(META).unwrap();
assert_eq!(meta.get(META_UPDATE_COUNTER).unwrap().unwrap().value(), 0);
}
#[test]
fn reopen_warm_starts_static_index() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("c.redb");
let tk = "cat\x1fns\x1ft";
{
let store = Store::open(path.clone(), redb::Durability::Immediate).unwrap();
let db = store.db.blocking_lock();
record(&db, tk, 5, "loc-5");
}
let store = Store::open(path, redb::Durability::Immediate).unwrap();
let idx = store.static_index.load_full().expect("warm-started index");
assert_eq!(idx.lookup(5).unwrap().metadata_location.as_ref(), "loc-5");
assert_eq!(store.cutoff.load(Ordering::Acquire), 5);
assert_eq!(store.writes_since_compaction.load(Ordering::Acquire), 1);
}
}