use std::{
any::{Any, TypeId},
borrow::Borrow,
cell::RefCell,
hash::Hash,
marker::PhantomData,
sync::Arc,
};
use fxhash::FxHashSet;
use serde::{de::DeserializeOwned, Serialize};
use crate::{
batch::{BatchReference, Operation, VerticalBatch},
db::TableId,
inner::AtomoInner,
keys::VerticalKeys,
serder::SerdeBackend,
snapshot::Snapshot,
KeyIterator,
};
pub struct TableMeta {
pub _name: String,
pub k_id: TypeId,
pub v_id: TypeId,
}
#[derive(Clone, Copy)]
pub struct ResolvedTableReference<K, V> {
atomo_id: usize,
index: TableId,
kv: PhantomData<(K, V)>,
}
pub struct TableSelector<S: SerdeBackend> {
atomo: Arc<AtomoInner<S>>,
snapshot: Snapshot<VerticalBatch, VerticalKeys>,
selected: RefCell<FxHashSet<TableId>>,
batch: VerticalBatch,
keys: RefCell<VerticalKeys>,
}
pub struct TableRef<
'selector,
K: Hash + Eq + Serialize + DeserializeOwned + Any,
V: Serialize + DeserializeOwned + Any,
S: SerdeBackend,
> {
tid: TableId,
batch: BatchReference,
selector: &'selector TableSelector<S>,
kv: PhantomData<(K, V)>,
}
impl TableMeta {
#[inline(always)]
pub fn new<K: Any, V: Any>(_name: String) -> Self {
let k_id = TypeId::of::<K>();
let v_id = TypeId::of::<V>();
Self { _name, k_id, v_id }
}
}
impl<'selector, K, V, S: SerdeBackend> Drop for TableRef<'selector, K, V, S>
where
K: Hash + Eq + Serialize + DeserializeOwned + Any,
V: Serialize + DeserializeOwned + Any,
{
fn drop(&mut self) {
self.selector.selected.borrow_mut().remove(&self.tid);
}
}
impl<S: SerdeBackend> TableSelector<S> {
#[inline]
pub fn new(atomo: Arc<AtomoInner<S>>) -> Self {
let num_tables = atomo.tables.len();
let batch = VerticalBatch::new(num_tables);
let snapshot = atomo.snapshot_list.current();
let keys = snapshot.get_metadata().clone();
Self {
atomo,
snapshot,
selected: RefCell::new(FxHashSet::default()),
batch,
keys: RefCell::new(keys),
}
}
#[inline]
pub(crate) fn into_raw(self) -> (VerticalBatch, VerticalKeys) {
(self.batch, self.keys.into_inner())
}
pub fn get_table<K, V>(&self, name: impl AsRef<str>) -> TableRef<K, V, S>
where
K: Hash + Eq + Serialize + DeserializeOwned + Any,
V: Serialize + DeserializeOwned + Any,
{
self.atomo.resolve::<K, V>(name).get(self)
}
}
impl<K, V> ResolvedTableReference<K, V> {
pub(crate) fn new(atomo_id: usize, index: TableId) -> Self {
ResolvedTableReference {
atomo_id,
index,
kv: PhantomData,
}
}
pub fn get<'selector, S: SerdeBackend>(
&self,
selector: &'selector TableSelector<S>,
) -> TableRef<'selector, K, V, S>
where
K: Hash + Eq + Serialize + DeserializeOwned + Any,
V: Serialize + DeserializeOwned + Any,
{
assert_eq!(
self.atomo_id, selector.atomo.id,
"Table reference of another Atomo instance was used."
);
if !selector.selected.borrow_mut().insert(self.index) {
panic!("Table reference is already claimed.");
}
let batch = unsafe { selector.batch.claim(self.index as usize) };
TableRef {
tid: self.index,
batch,
selector,
kv: PhantomData,
}
}
}
impl<'selector, K, V, S: SerdeBackend> TableRef<'selector, K, V, S>
where
K: Hash + Eq + Serialize + DeserializeOwned + Any,
V: Serialize + DeserializeOwned + Any,
{
pub fn insert(&mut self, key: impl Borrow<K>, value: impl Borrow<V>) {
let k = S::serialize(key.borrow()).into_boxed_slice();
let v = S::serialize(value.borrow()).into_boxed_slice();
self.selector
.keys
.borrow_mut()
.update(self.tid, |collection| {
collection.insert(k.clone());
});
self.batch.as_mut().insert(k, Operation::Insert(v));
}
pub fn remove(&mut self, key: impl Borrow<K>) {
let k = S::serialize(key.borrow()).into_boxed_slice();
self.selector
.keys
.borrow_mut()
.update(self.tid, |collection| {
collection.remove(&k);
});
self.batch.as_mut().insert(k, Operation::Remove);
}
pub fn get(&self, key: impl Borrow<K>) -> Option<V> {
let k = S::serialize(key.borrow()).into_boxed_slice();
match self.batch.get(&k) {
Some(Operation::Insert(value)) => return Some(S::deserialize(value)),
Some(Operation::Remove) => return None,
_ => {},
}
let index = self.tid as usize;
if let Some(operation) = self
.selector
.snapshot
.find(|batch| batch.get(index).get(&k))
{
return match operation {
Operation::Remove => None,
Operation::Insert(value) => Some(S::deserialize(value)),
};
}
self.selector.atomo.get::<V>(self.tid, &k)
}
pub fn contains_key(&self, key: impl Borrow<K>) -> bool {
let k = S::serialize(key.borrow()).into_boxed_slice();
{
let keys_ref = self.selector.keys.borrow();
if let Some(im) = keys_ref.get(self.tid) {
return im.contains(&k);
}
}
match self.batch.get(&k) {
Some(Operation::Insert(_)) => return true,
Some(Operation::Remove) => return false,
_ => {},
}
let index = self.tid as usize;
if let Some(op) = self
.selector
.snapshot
.find(|batch| batch.get(index).get(&k))
{
return match op {
Operation::Insert(_) => true,
Operation::Remove => false,
};
}
self.selector.atomo.contains_key(self.tid, &k)
}
pub fn keys(&self) -> KeyIterator<K> {
let keys = self
.selector
.keys
.borrow()
.get(self.tid)
.clone()
.expect("Iterator functionality is not enabled for the table.");
KeyIterator::new(keys)
}
}