use std::{
any::{Any, TypeId},
hash::Hash,
marker::PhantomData,
sync::atomic::AtomicUsize,
};
use dashmap::DashMap;
use fxhash::FxHashMap;
use serde::{de::DeserializeOwned, Serialize};
use crate::{
batch::{Operation, VerticalBatch},
db::TableId,
keys::VerticalKeys,
serder::SerdeBackend,
snapshot::SnapshotList,
table::{ResolvedTableReference, TableMeta},
};
static INSTANCE_COUNT: AtomicUsize = AtomicUsize::new(0);
type MockPersistance = DashMap<Box<[u8]>, Box<[u8]>, fxhash::FxBuildHasher>;
pub struct AtomoInner<S: SerdeBackend> {
pub id: usize,
pub persistence: Vec<MockPersistance>,
pub tables: Vec<TableMeta>,
pub table_name_to_id: FxHashMap<String, TableId>,
pub snapshot_list: SnapshotList<VerticalBatch, VerticalKeys>,
serde: PhantomData<S>,
}
impl<S: SerdeBackend> AtomoInner<S> {
pub fn empty() -> Self {
let id = INSTANCE_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
AtomoInner {
id,
persistence: Vec::new(),
tables: Vec::new(),
table_name_to_id: FxHashMap::default(),
snapshot_list: SnapshotList::default(),
serde: PhantomData,
}
}
pub fn perform_batch(&self, batch: VerticalBatch) {
for (table, batch) in self.persistence.iter().zip(batch.into_raw().into_iter()) {
for (key, operation) in batch.into_iter() {
match operation {
Operation::Remove => {
table.remove(&key);
},
Operation::Insert(value) => {
table.insert(key, value);
},
}
}
}
}
#[inline]
pub fn resolve<K, V>(&self, name: impl AsRef<str>) -> ResolvedTableReference<K, V>
where
K: Hash + Eq + Serialize + DeserializeOwned + Any,
V: Serialize + DeserializeOwned + Any,
{
let name = name.as_ref();
let index = *self
.table_name_to_id
.get(name)
.unwrap_or_else(|| panic!("Table {name} not found."));
let info = &self.tables[index as usize];
let k_id = TypeId::of::<K>();
let v_id = TypeId::of::<V>();
let k_str = std::any::type_name::<K>();
let v_str = std::any::type_name::<V>();
assert_eq!(
info.k_id, k_id,
"Could not resolve table '{name}' with key type '{k_str}'."
);
assert_eq!(
info.v_id, v_id,
"Could not resolve table '{name}' with key type '{v_str}'."
);
ResolvedTableReference::<K, V>::new(self.id, index)
}
#[inline]
pub fn compute_inverse(&self, batch: &VerticalBatch) -> VerticalBatch {
let num_tables = self.tables.len();
let mut inverse = VerticalBatch::new(num_tables);
for t in 0..num_tables {
let table = inverse.get_mut(t);
let batch_table = batch.get(t);
table.reserve(batch_table.len());
for (key, op) in batch_table.iter() {
match (op, self.get_raw(t as TableId, key)) {
(Operation::Remove, Some(old_value)) => {
table.insert(key.clone(), Operation::Insert(old_value.into_boxed_slice()));
},
(Operation::Insert(_new_value), None) => {
table.insert(key.clone(), Operation::Remove);
},
(Operation::Insert(new_value), Some(old_value))
if new_value[..] != old_value[..] =>
{
table.insert(key.clone(), Operation::Insert(old_value.into_boxed_slice()));
},
(Operation::Remove, None) => {},
(Operation::Insert(_new_value), Some(_old_value)) => {},
}
}
if table.capacity() >= table.len() * 2 {
table.shrink_to_fit();
}
}
inverse
}
}
impl<S: SerdeBackend> AtomoInner<S> {
pub fn get_raw(&self, tid: TableId, key: &[u8]) -> Option<Vec<u8>> {
self.persistence[tid as usize].get(key).map(|v| v.to_vec())
}
pub fn get<V>(&self, tid: TableId, key: &[u8]) -> Option<V>
where
V: Serialize + DeserializeOwned + Any,
{
self.get_raw(tid, key).map(|v| S::deserialize(&v))
}
pub fn contains_key(&self, tid: TableId, key: &[u8]) -> bool {
self.persistence[tid as usize].contains_key(key)
}
}