use std::ops::Bound;
use fjall::{Keyspace, OptimisticTxDatabase, OptimisticTxKeyspace, Readable};
use crate::utils::CollectionInfo;
use crate::{DbError, DbResult, migration};
use super::protocol::UpsertKey;
pub(crate) trait RpcHandler: Send + Sync {
fn name(&self) -> &str;
fn partition_name(&self) -> String;
fn info(&self) -> (u64, u16);
fn get(&self, key: &[u8]) -> DbResult<Option<Vec<u8>>>;
fn contains(&self, key: &[u8]) -> DbResult<Option<u32>>;
fn first(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>>;
fn last(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>>;
fn range(
&self,
start: Bound<Vec<u8>>,
end: Bound<Vec<u8>>,
) -> DbResult<Vec<(Vec<u8>, Vec<u8>)>>;
fn range_keys(&self, start: Bound<Vec<u8>>, end: Bound<Vec<u8>>) -> DbResult<Vec<Vec<u8>>>;
fn upsert(&self, key: UpsertKey, flag: Option<bool>, value: Vec<u8>) -> DbResult<Vec<u8>>;
fn remove(&self, key: &[u8], soft: bool) -> DbResult<()>;
fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<Vec<u8>>>;
fn count(&self, exact: bool) -> DbResult<u64>;
fn apply_batch(&self, items: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> DbResult<()>;
}
fn guard_to_kv(guard: fjall::Guard) -> Option<(Vec<u8>, Vec<u8>)> {
match guard.into_inner() {
Ok((k, v)) => Some((k.to_vec(), v.to_vec())),
Err(e) => {
error!(%e);
None
}
}
}
fn guard_to_key(guard: fjall::Guard) -> Option<Vec<u8>> {
match guard.key() {
Ok(k) => Some(k.to_vec()),
Err(e) => {
error!(%e);
None
}
}
}
fn next_id(seq_tree: &Keyspace, name: &str) -> DbResult<u64> {
let key = format!("__next_id-{}", name);
let val = seq_tree.get(&key)?;
let val = match val {
Some(val) => {
let bytes = val.as_ref().try_into().expect("Invalid byte array length");
u64::from_le_bytes(bytes)
}
None => 1,
};
let bytes = (val + 1).to_le_bytes();
seq_tree.insert(&key, bytes)?;
Ok(val)
}
fn tx_next_id(
db: &OptimisticTxDatabase,
seq_tree: &OptimisticTxKeyspace,
name: &str,
) -> DbResult<u64> {
let next_id_key = format!("next_id-{}", name);
let key_ref = next_id_key.as_bytes();
loop {
let mut tx = db.write_tx().map_err(DbError::from)?;
let current = tx.get(seq_tree, key_ref).map_err(DbError::from)?;
let mut id = 1u64;
if let Some(bytes) = current {
let bytes = bytes
.as_ref()
.try_into()
.map_err(|err| DbError::Armour(crate::types::ArmourError::from(err)))?;
let old = u64::from_le_bytes(bytes);
id = old + 1;
}
let next_val = id.to_le_bytes();
tx.insert(seq_tree, key_ref, next_val);
match tx.commit() {
Ok(_) => return Ok(id),
Err(_) => continue,
}
}
}
pub(crate) struct KsHandler {
pub name: String,
pub info: CollectionInfo,
pub tree: Keyspace,
pub seq_tree: Keyspace,
}
impl RpcHandler for KsHandler {
fn name(&self) -> &str {
&self.name
}
fn partition_name(&self) -> String {
migration::collection_name(&self.name, self.info.version)
}
fn info(&self) -> (u64, u16) {
(self.info.typ_hash, self.info.version)
}
fn get(&self, key: &[u8]) -> DbResult<Option<Vec<u8>>> {
self.tree
.get(key)
.map(|item| item.map(|v| v.to_vec()))
.map_err(DbError::from)
}
fn contains(&self, key: &[u8]) -> DbResult<Option<u32>> {
match self.tree.get(key)? {
Some(v) => Ok(Some(v.len() as u32)),
None => Ok(None),
}
}
fn first(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>> {
match self.tree.first_key_value() {
Some(guard) => Ok(guard_to_kv(guard)),
None => Ok(None),
}
}
fn last(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>> {
match self.tree.last_key_value() {
Some(guard) => Ok(guard_to_kv(guard)),
None => Ok(None),
}
}
fn range(
&self,
start: Bound<Vec<u8>>,
end: Bound<Vec<u8>>,
) -> DbResult<Vec<(Vec<u8>, Vec<u8>)>> {
Ok(self
.tree
.range((start, end))
.filter_map(guard_to_kv)
.collect())
}
fn range_keys(&self, start: Bound<Vec<u8>>, end: Bound<Vec<u8>>) -> DbResult<Vec<Vec<u8>>> {
Ok(self
.tree
.range((start, end))
.filter_map(guard_to_key)
.collect())
}
fn upsert(&self, key: UpsertKey, flag: Option<bool>, value: Vec<u8>) -> DbResult<Vec<u8>> {
let key_bytes = match key {
UpsertKey::Sequence => {
let id = next_id(&self.seq_tree, &self.name)?;
id.to_le_bytes().to_vec()
}
UpsertKey::Provided(k) => k,
};
if let Some(update_only) = flag {
let exists = self.tree.contains_key(&key_bytes)?;
if update_only && !exists {
return Err(DbError::NotFound);
}
if !update_only && exists {
return Err(DbError::Client("already exists"));
}
}
self.tree.insert(&key_bytes, &value)?;
Ok(key_bytes)
}
fn remove(&self, key: &[u8], _soft: bool) -> DbResult<()> {
self.tree.remove(key)?;
Ok(())
}
fn take(&self, _key: &[u8], _soft: bool) -> DbResult<Option<Vec<u8>>> {
Err(DbError::NotImplemented)
}
fn count(&self, exact: bool) -> DbResult<u64> {
if exact {
Ok(self.tree.iter().count() as u64)
} else {
Ok(self.tree.approximate_len() as u64)
}
}
fn apply_batch(&self, items: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> DbResult<()> {
for (key, value) in items {
match value {
Some(v) => self.tree.insert(&key, &v)?,
None => self.tree.remove(&key)?,
};
}
Ok(())
}
}
pub(crate) struct TxKsHandler {
pub name: String,
pub info: CollectionInfo,
pub tree: OptimisticTxKeyspace,
pub seq_tree: OptimisticTxKeyspace,
pub db: OptimisticTxDatabase,
}
impl RpcHandler for TxKsHandler {
fn name(&self) -> &str {
&self.name
}
fn partition_name(&self) -> String {
migration::collection_name(&self.name, self.info.version)
}
fn info(&self) -> (u64, u16) {
(self.info.typ_hash, self.info.version)
}
fn get(&self, key: &[u8]) -> DbResult<Option<Vec<u8>>> {
self.tree
.get(key)
.map(|item| item.map(|v| v.to_vec()))
.map_err(DbError::from)
}
fn contains(&self, key: &[u8]) -> DbResult<Option<u32>> {
match self.tree.get(key)? {
Some(v) => Ok(Some(v.len() as u32)),
None => Ok(None),
}
}
fn first(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>> {
match self.tree.first_key_value() {
Some(guard) => Ok(guard_to_kv(guard)),
None => Ok(None),
}
}
fn last(&self) -> DbResult<Option<(Vec<u8>, Vec<u8>)>> {
match self.tree.last_key_value() {
Some(guard) => Ok(guard_to_kv(guard)),
None => Ok(None),
}
}
fn range(
&self,
start: Bound<Vec<u8>>,
end: Bound<Vec<u8>>,
) -> DbResult<Vec<(Vec<u8>, Vec<u8>)>> {
let tx = self.db.read_tx();
Ok(tx
.range(&self.tree, (start, end))
.filter_map(guard_to_kv)
.collect())
}
fn range_keys(&self, start: Bound<Vec<u8>>, end: Bound<Vec<u8>>) -> DbResult<Vec<Vec<u8>>> {
let tx = self.db.read_tx();
Ok(tx
.range(&self.tree, (start, end))
.filter_map(guard_to_key)
.collect())
}
fn upsert(&self, key: UpsertKey, flag: Option<bool>, value: Vec<u8>) -> DbResult<Vec<u8>> {
let key_bytes = match key {
UpsertKey::Sequence => {
let id = tx_next_id(&self.db, &self.seq_tree, &self.name)?;
id.to_le_bytes().to_vec()
}
UpsertKey::Provided(k) => k,
};
loop {
let mut tx = self.db.write_tx().map_err(DbError::from)?;
if let Some(update_only) = flag {
let exists = tx
.contains_key(&self.tree, &key_bytes)
.map_err(DbError::from)?;
if update_only && !exists {
return Err(DbError::NotFound);
}
if !update_only && exists {
return Err(DbError::Client("already exists"));
}
}
tx.insert(&self.tree, &key_bytes, &value);
match tx.commit() {
Ok(_) => return Ok(key_bytes),
Err(_) => continue,
}
}
}
fn remove(&self, key: &[u8], _soft: bool) -> DbResult<()> {
loop {
let mut tx = self.db.write_tx().map_err(DbError::from)?;
tx.remove(&self.tree, key);
match tx.commit() {
Ok(_) => return Ok(()),
Err(_) => continue,
}
}
}
fn take(&self, key: &[u8], _soft: bool) -> DbResult<Option<Vec<u8>>> {
loop {
let mut tx = self.db.write_tx().map_err(DbError::from)?;
let old = tx.get(&self.tree, key).map_err(DbError::from)?;
if old.is_some() {
tx.remove(&self.tree, key);
}
match tx.commit() {
Ok(_) => return Ok(old.map(|v: fjall::Slice| v.to_vec())),
Err(_) => continue,
}
}
}
fn count(&self, exact: bool) -> DbResult<u64> {
if exact {
let tx = self.db.read_tx();
Ok(tx.range::<Vec<u8>, _>(&self.tree, ..).count() as u64)
} else {
Ok(self.tree.approximate_len() as u64)
}
}
fn apply_batch(&self, items: Vec<(Vec<u8>, Option<Vec<u8>>)>) -> DbResult<()> {
loop {
let mut tx = self.db.write_tx().map_err(DbError::from)?;
for (key, value) in &items {
match value {
Some(v) => tx.insert(&self.tree, key, v),
None => tx.remove(&self.tree, key),
}
}
match tx.commit() {
Ok(_) => return Ok(()),
Err(_) => continue,
}
}
}
}