use super::error::RedbResult;
use super::tables::{TABLE_DATA, TABLE_LOG};
use crate::store::{StoreEvent, StoreOp, SubscriptionEntry, matches_kind};
use bytes::Bytes;
use redb::{Database, ReadableTable};
use std::sync::Arc;
use std::sync::RwLock;
pub(super) fn process_inbox(
db: &Database,
subs: &RwLock<Vec<SubscriptionEntry>>,
) -> RedbResult<()> {
let write_txn = db.begin_write()?;
let mut events = Vec::new();
{
let mut log_table = write_txn.open_table(TABLE_LOG)?;
let data_table = write_txn.open_table(TABLE_DATA)?;
let mut to_delete = Vec::new();
for result in log_table.iter()? {
let (id, path_guard) = result?;
let path = path_guard.value();
let current_val = data_table.get(path)?;
events.push(StoreEvent {
path: Arc::from(path),
op: if current_val.is_some() {
StoreOp::Set
} else {
StoreOp::Delete
},
old: None,
new: current_val.map(|v| Bytes::copy_from_slice(v.value())),
});
to_delete.push(id.value());
}
for id in to_delete {
log_table.remove(id)?;
}
}
write_txn.commit()?;
for event in events {
emit_local(subs, event);
}
Ok(())
}
pub(super) fn emit_local(subs_lock: &RwLock<Vec<SubscriptionEntry>>, event: StoreEvent) {
let callbacks = {
let guard = subs_lock.read().unwrap();
guard
.iter()
.filter(|s| matches_kind(&s.kind, &event.path))
.map(|s| s.callback.clone())
.collect::<Vec<_>>()
};
for cb in callbacks {
cb(&event);
}
}