use std::hash::Hash;
use std::ops::Bound;
use std::sync::Arc;
use armour_rpc::protocol::UpsertKey;
use crate::hook::TypedWriteHook;
use crate::zero_tree::{from_value_bytes, to_bytes};
use crate::{Codec, DbError, DbResult, Key, TypedMap, TypedTree, ZeroMap, ZeroTree};
pub type KeyBytes = Vec<u8>;
pub type ValueBytes = Vec<u8>;
pub trait RpcHandler: Send + Sync {
fn name(&self) -> &str;
fn info(&self) -> (u64, u16);
fn schema(&self) -> armour_rpc::SchemaResponse;
fn get(&self, key: &[u8]) -> DbResult<Option<ValueBytes>>;
fn contains(&self, key: &[u8]) -> DbResult<bool>;
fn entry_len(&self, key: &[u8]) -> DbResult<Option<u32>>;
fn first(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>>;
fn last(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>>;
fn range(
&self,
start: Bound<KeyBytes>,
end: Bound<KeyBytes>,
limit: u32,
) -> DbResult<Vec<(KeyBytes, ValueBytes)>>;
fn range_keys(
&self,
start: Bound<KeyBytes>,
end: Bound<KeyBytes>,
limit: u32,
) -> DbResult<Vec<ValueBytes>>;
fn upsert(&self, key: UpsertKey, flag: Option<bool>, value: ValueBytes)
-> DbResult<ValueBytes>;
fn remove(&self, key: &[u8], soft: bool) -> DbResult<()>;
fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<ValueBytes>>;
fn count(&self, exact: bool) -> DbResult<u64>;
fn apply_batch(&self, items: Vec<(KeyBytes, Option<ValueBytes>)>) -> DbResult<()>;
}
fn check_key_len<K: Key>(key: &[u8]) -> DbResult<K> {
if key.len() != size_of::<K>() {
return Err(DbError::KeyNotFound);
}
Ok(K::from_bytes(key))
}
fn resolve_key<K: Key>(
key: UpsertKey,
seq: &super::seq::SeqGen,
name: &str,
) -> DbResult<(K, KeyBytes)> {
let key_bytes = match key {
UpsertKey::Sequence => {
let id = seq.next_id(name)?;
id.to_le_bytes().to_vec()
}
UpsertKey::Provided(k) => k,
};
if key_bytes.len() != size_of::<K>() {
return Err(DbError::Config("invalid key bytes"));
}
Ok((K::from_bytes(&key_bytes), key_bytes))
}
#[allow(dead_code)]
fn check_flag(tree_contains: bool, flag: Option<bool>) -> DbResult<()> {
if let Some(update_only) = flag {
if update_only && !tree_contains {
return Err(DbError::KeyNotFound);
}
if !update_only && tree_contains {
return Err(DbError::KeyExists);
}
}
Ok(())
}
fn bound_to_key_bound<K: Key>(bound: &Bound<KeyBytes>) -> DbResult<Bound<K>> {
match bound {
Bound::Included(b) => {
if b.len() != size_of::<K>() {
return Err(DbError::Config("invalid bound key bytes"));
}
Ok(Bound::Included(K::from_bytes(b)))
}
Bound::Excluded(b) => {
if b.len() != size_of::<K>() {
return Err(DbError::Config("invalid bound key bytes"));
}
Ok(Bound::Excluded(K::from_bytes(b)))
}
Bound::Unbounded => Ok(Bound::Unbounded),
}
}
fn apply_limit<I: Iterator>(iter: I, limit: u32) -> impl Iterator<Item = I::Item> {
let take = if limit == 0 {
usize::MAX
} else {
limit as usize
};
iter.take(take)
}
fn unsupported(op: &str) -> DbResult<()> {
let msg: &'static str = match op {
"first" => "first not supported for map collections",
"last" => "last not supported for map collections",
"range" => "range not supported for map collections",
"range_keys" => "range_keys not supported for map collections",
_ => "operation not supported for map collections",
};
Err(DbError::Config(msg))
}
pub struct TypedTreeHandler<K, T, C, H: TypedWriteHook<K, T> = crate::NoHook>
where
K: Key + Ord,
T: Send + Sync,
C: Codec<T>,
{
pub name: String,
pub typ_hash: u64,
pub ty: armour_core::Typ,
pub key_scheme: armour_core::KeyScheme,
pub version: u16,
pub tree: Arc<TypedTree<K, T, C, H>>,
pub codec: Arc<C>,
pub seq: Arc<super::seq::SeqGen>,
}
impl<K, T, C, H> RpcHandler for TypedTreeHandler<K, T, C, H>
where
K: Key + Ord + Send + Sync,
T: Send + Sync,
C: Codec<T>,
H: TypedWriteHook<K, T>,
{
fn name(&self) -> &str {
&self.name
}
fn info(&self) -> (u64, u16) {
(self.typ_hash, self.version)
}
fn schema(&self) -> armour_rpc::SchemaResponse {
armour_rpc::SchemaResponse {
name: self.name.clone(),
version: self.version,
typ: self.ty,
key_scheme: self.key_scheme,
}
}
fn get(&self, key: &[u8]) -> DbResult<Option<ValueBytes>> {
let key: K = check_key_len(key)?;
match self.tree.get(&key) {
Some(val) => {
let mut buf = Vec::new();
self.codec.encode_to(&val, &mut buf)?;
Ok(Some(buf))
}
None => Ok(None),
}
}
fn contains(&self, key: &[u8]) -> DbResult<bool> {
let k = check_key_len::<K>(key)?;
Ok(self.tree.contains(&k))
}
fn entry_len(&self, key: &[u8]) -> DbResult<Option<u32>> {
let k = check_key_len::<K>(key)?;
Ok(self.tree.get(&k).map(|v| self.codec.size(&v) as u32))
}
fn first(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
match self.tree.first() {
Some((k, v)) => {
let mut buf = Vec::new();
self.codec.encode_to(&v, &mut buf)?;
Ok(Some((k.as_bytes().to_vec(), buf)))
}
None => Ok(None),
}
}
fn last(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
match self.tree.last() {
Some((k, v)) => {
let mut buf = Vec::new();
self.codec.encode_to(&v, &mut buf)?;
Ok(Some((k.as_bytes().to_vec(), buf)))
}
None => Ok(None),
}
}
fn range(
&self,
start: Bound<KeyBytes>,
end: Bound<KeyBytes>,
limit: u32,
) -> DbResult<Vec<(KeyBytes, ValueBytes)>> {
let sb = bound_to_key_bound::<K>(&start)?;
let eb = bound_to_key_bound::<K>(&end)?;
let iter = self.tree.range_bounds(sb.as_ref(), eb.as_ref());
let mut result = Vec::new();
for (k, v) in apply_limit(iter, limit) {
let mut buf = Vec::new();
self.codec.encode_to(v, &mut buf)?;
result.push((k.as_bytes().to_vec(), buf));
}
Ok(result)
}
fn range_keys(
&self,
start: Bound<KeyBytes>,
end: Bound<KeyBytes>,
limit: u32,
) -> DbResult<Vec<ValueBytes>> {
let sb = bound_to_key_bound::<K>(&start)?;
let eb = bound_to_key_bound::<K>(&end)?;
let iter = self.tree.range_bounds(sb.as_ref(), eb.as_ref());
let mut result = Vec::new();
for (k, _) in apply_limit(iter, limit) {
result.push(k.as_bytes().to_vec());
}
Ok(result)
}
fn upsert(
&self,
key: UpsertKey,
flag: Option<bool>,
value: ValueBytes,
) -> DbResult<ValueBytes> {
let (typed_key, key_bytes) = resolve_key::<K>(key, &self.seq, &self.name)?;
let decoded = self.codec.decode_from(&value)?;
match flag {
None => {
self.tree.put(&typed_key, decoded)?;
Ok(key_bytes)
}
Some(false) => self
.tree
.atomic(&typed_key, |shard| {
if shard.contains(&typed_key) {
return Err(DbError::KeyExists);
}
shard.put(&typed_key, decoded)?;
Ok(())
})
.map(|_| key_bytes),
Some(true) => self
.tree
.atomic(&typed_key, |shard| {
if !shard.contains(&typed_key) {
return Err(DbError::KeyNotFound);
}
shard.put(&typed_key, decoded)?;
Ok(())
})
.map(|_| key_bytes),
}
}
fn remove(&self, key: &[u8], soft: bool) -> DbResult<()> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
self.tree.delete(&k).map(|_| ())
}
fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<ValueBytes>> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
let codec = Arc::clone(&self.codec);
self.tree.atomic(&k, |shard| {
if let Some(v) = shard.get(&k) {
let mut buf = Vec::new();
codec.encode_to(v, &mut buf)?;
shard.delete(&k)?;
Ok(Some(buf))
} else {
Ok(None)
}
})
}
fn count(&self, exact: bool) -> DbResult<u64> {
Ok(if exact {
self.tree.iter().count() as u64
} else {
self.tree.len() as u64
})
}
fn apply_batch(&self, items: Vec<(KeyBytes, Option<ValueBytes>)>) -> DbResult<()> {
let decoded_items: Vec<(K, Option<T>)> = items
.into_iter()
.map(|(key, val)| {
if key.len() != size_of::<K>() {
return Err(DbError::Client("invalid key length in batch"));
}
let decoded_val = match val {
Some(v) => Some(
self.codec
.decode_from(&v)
.map_err(|_| DbError::Client("invalid value bytes in batch"))?,
),
None => None,
};
Ok((K::from_bytes(&key), decoded_val))
})
.collect::<DbResult<_>>()?;
for (k, val) in decoded_items {
match val {
Some(decoded) => {
self.tree.put(&k, decoded)?;
}
None => {
self.tree.delete(&k)?;
}
}
}
Ok(())
}
}
pub struct TypedMapHandler<K, T, C, H: TypedWriteHook<K, T> = crate::NoHook>
where
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T>,
{
pub name: String,
pub typ_hash: u64,
pub ty: armour_core::Typ,
pub key_scheme: armour_core::KeyScheme,
pub version: u16,
pub map: Arc<TypedMap<K, T, C, H>>,
pub codec: Arc<C>,
pub seq: Arc<super::seq::SeqGen>,
}
impl<K, T, C, H> RpcHandler for TypedMapHandler<K, T, C, H>
where
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T>,
H: TypedWriteHook<K, T>,
{
fn name(&self) -> &str {
&self.name
}
fn info(&self) -> (u64, u16) {
(self.typ_hash, self.version)
}
fn schema(&self) -> armour_rpc::SchemaResponse {
armour_rpc::SchemaResponse {
name: self.name.clone(),
version: self.version,
typ: self.ty,
key_scheme: self.key_scheme,
}
}
fn get(&self, key: &[u8]) -> DbResult<Option<ValueBytes>> {
let key: K = check_key_len(key)?;
match self.map.get(&key) {
Some(val) => {
let mut buf = Vec::new();
self.codec.encode_to(&val, &mut buf)?;
Ok(Some(buf))
}
None => Ok(None),
}
}
fn contains(&self, key: &[u8]) -> DbResult<bool> {
let k = check_key_len::<K>(key)?;
Ok(self.map.contains(&k))
}
fn entry_len(&self, key: &[u8]) -> DbResult<Option<u32>> {
let k = check_key_len::<K>(key)?;
Ok(self.map.get(&k).map(|v| self.codec.size(&v) as u32))
}
fn first(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
unsupported("first")?;
unreachable!()
}
fn last(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
unsupported("last")?;
unreachable!()
}
fn range(
&self,
_start: Bound<KeyBytes>,
_end: Bound<KeyBytes>,
_limit: u32,
) -> DbResult<Vec<(KeyBytes, ValueBytes)>> {
unsupported("range")?;
unreachable!()
}
fn range_keys(
&self,
_start: Bound<KeyBytes>,
_end: Bound<KeyBytes>,
_limit: u32,
) -> DbResult<Vec<ValueBytes>> {
unsupported("range_keys")?;
unreachable!()
}
fn upsert(
&self,
key: UpsertKey,
flag: Option<bool>,
value: ValueBytes,
) -> DbResult<ValueBytes> {
let (typed_key, key_bytes) = resolve_key::<K>(key, &self.seq, &self.name)?;
let decoded = self.codec.decode_from(&value)?;
match flag {
None => {
self.map.put(&typed_key, decoded)?;
Ok(key_bytes)
}
Some(false) => self
.map
.atomic(&typed_key, |shard| {
if shard.contains(&typed_key) {
return Err(DbError::KeyExists);
}
shard.put(&typed_key, decoded)?;
Ok(())
})
.map(|_| key_bytes),
Some(true) => self
.map
.atomic(&typed_key, |shard| {
if !shard.contains(&typed_key) {
return Err(DbError::KeyNotFound);
}
shard.put(&typed_key, decoded)?;
Ok(())
})
.map(|_| key_bytes),
}
}
fn remove(&self, key: &[u8], soft: bool) -> DbResult<()> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
self.map.delete(&k).map(|_| ())
}
fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<ValueBytes>> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
let codec = Arc::clone(&self.codec);
self.map.atomic(&k, |shard| {
if let Some(v) = shard.get(&k) {
let mut buf = Vec::new();
codec.encode_to(v, &mut buf)?;
shard.delete(&k)?;
Ok(Some(buf))
} else {
Ok(None)
}
})
}
fn count(&self, _exact: bool) -> DbResult<u64> {
Ok(self.map.len() as u64)
}
fn apply_batch(&self, items: Vec<(KeyBytes, Option<ValueBytes>)>) -> DbResult<()> {
let decoded_items: Vec<(K, Option<T>)> = items
.into_iter()
.map(|(key, val)| {
if key.len() != size_of::<K>() {
return Err(DbError::Client("invalid key length in batch"));
}
let decoded_val = match val {
Some(v) => Some(
self.codec
.decode_from(&v)
.map_err(|_| DbError::Client("invalid value bytes in batch"))?,
),
None => None,
};
Ok((K::from_bytes(&key), decoded_val))
})
.collect::<DbResult<_>>()?;
for (k, val) in decoded_items {
match val {
Some(decoded) => {
self.map.put(&k, decoded)?;
}
None => {
self.map.delete(&k)?;
}
}
}
Ok(())
}
}
pub struct ZeroTreeHandler<
K,
const V: usize,
T,
H: TypedWriteHook<K, T> = crate::NoHook,
D: crate::durability::Durability = crate::durability::Bitcask,
> where
K: Key + Ord,
T: Copy + zerocopy::IntoBytes + zerocopy::FromBytes + zerocopy::Immutable + Send + Sync,
{
pub name: String,
pub typ_hash: u64,
pub ty: armour_core::Typ,
pub key_scheme: armour_core::KeyScheme,
pub version: u16,
pub tree: Arc<ZeroTree<K, V, T, H, D>>,
pub seq: Arc<super::seq::SeqGen>,
}
impl<K, const V: usize, T, H, D> RpcHandler for ZeroTreeHandler<K, V, T, H, D>
where
K: Key + Ord + Send + Sync,
T: Copy + zerocopy::IntoBytes + zerocopy::FromBytes + zerocopy::Immutable + Send + Sync,
H: TypedWriteHook<K, T>,
D: crate::durability::Durability,
{
fn name(&self) -> &str {
&self.name
}
fn info(&self) -> (u64, u16) {
(self.typ_hash, self.version)
}
fn schema(&self) -> armour_rpc::SchemaResponse {
armour_rpc::SchemaResponse {
name: self.name.clone(),
version: self.version,
typ: self.ty,
key_scheme: self.key_scheme,
}
}
fn get(&self, key: &[u8]) -> DbResult<Option<ValueBytes>> {
let key: K = check_key_len(key)?;
match self.tree.get(&key) {
Some(val) => Ok(Some(to_bytes::<V, T>(&val).to_vec())),
None => Ok(None),
}
}
fn contains(&self, key: &[u8]) -> DbResult<bool> {
let k = check_key_len::<K>(key)?;
Ok(self.tree.contains(&k))
}
fn entry_len(&self, key: &[u8]) -> DbResult<Option<u32>> {
let k = check_key_len::<K>(key)?;
Ok(if self.tree.contains(&k) {
Some(V as u32)
} else {
None
})
}
fn first(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
match self.tree.first() {
Some((k, v)) => Ok(Some((k.as_bytes().to_vec(), to_bytes::<V, T>(&v).to_vec()))),
None => Ok(None),
}
}
fn last(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
match self.tree.last() {
Some((k, v)) => Ok(Some((k.as_bytes().to_vec(), to_bytes::<V, T>(&v).to_vec()))),
None => Ok(None),
}
}
fn range(
&self,
start: Bound<KeyBytes>,
end: Bound<KeyBytes>,
limit: u32,
) -> DbResult<Vec<(KeyBytes, ValueBytes)>> {
let sb = bound_to_key_bound::<K>(&start)?;
let eb = bound_to_key_bound::<K>(&end)?;
Ok(apply_limit(
self.tree
.range_bounds(sb.as_ref(), eb.as_ref())
.map(|(k, v)| (k.as_bytes().to_vec(), to_bytes::<V, T>(&v).to_vec())),
limit,
)
.collect())
}
fn range_keys(
&self,
start: Bound<KeyBytes>,
end: Bound<KeyBytes>,
limit: u32,
) -> DbResult<Vec<ValueBytes>> {
let sb = bound_to_key_bound::<K>(&start)?;
let eb = bound_to_key_bound::<K>(&end)?;
Ok(apply_limit(
self.tree
.range_bounds(sb.as_ref(), eb.as_ref())
.map(|(k, _)| k.as_bytes().to_vec()),
limit,
)
.collect())
}
fn upsert(
&self,
key: UpsertKey,
flag: Option<bool>,
value: ValueBytes,
) -> DbResult<ValueBytes> {
let (typed_key, key_bytes) = resolve_key::<K>(key, &self.seq, &self.name)?;
if value.len() != V {
return Err(DbError::Client("invalid value length for zero collection"));
}
let zero_val: T = from_value_bytes::<V, T>(
value[..V]
.try_into()
.map_err(|_| DbError::CorruptedEntry { offset: 0 })?,
);
match flag {
None => {
self.tree.put(&typed_key, &zero_val)?;
Ok(key_bytes)
}
Some(false) => self
.tree
.atomic(&typed_key, |shard| {
if shard.contains(&typed_key) {
return Err(DbError::KeyExists);
}
shard.put(&typed_key, &zero_val)?;
Ok(())
})
.map(|_| key_bytes),
Some(true) => self
.tree
.atomic(&typed_key, |shard| {
if !shard.contains(&typed_key) {
return Err(DbError::KeyNotFound);
}
shard.put(&typed_key, &zero_val)?;
Ok(())
})
.map(|_| key_bytes),
}
}
fn remove(&self, key: &[u8], soft: bool) -> DbResult<()> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
self.tree.delete(&k).map(|_| ())
}
fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<ValueBytes>> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
self.tree.atomic(&k, |shard| {
let current = shard.get(&k);
let bytes = current.as_ref().map(|v| to_bytes::<V, T>(v).to_vec());
if current.is_some() {
shard.delete(&k)?;
}
Ok(bytes)
})
}
fn count(&self, exact: bool) -> DbResult<u64> {
Ok(if exact {
self.tree.iter().count() as u64
} else {
self.tree.len() as u64
})
}
fn apply_batch(&self, items: Vec<(KeyBytes, Option<ValueBytes>)>) -> DbResult<()> {
for (key, val) in &items {
if key.len() != size_of::<K>() {
return Err(DbError::Client("invalid key length in batch"));
}
if let Some(v) = val
&& v.len() != V
{
return Err(DbError::Client("invalid value length in batch"));
}
}
for (key_bytes, val) in items {
let k = K::from_bytes(&key_bytes);
match val {
Some(v) => {
let zv: T = from_value_bytes::<V, T>(
v[..V]
.try_into()
.map_err(|_| DbError::CorruptedEntry { offset: 0 })?,
);
self.tree.put(&k, &zv)?;
}
None => {
self.tree.delete(&k)?;
}
}
}
Ok(())
}
}
pub struct ZeroMapHandler<
K,
const V: usize,
T,
H: TypedWriteHook<K, T> = crate::NoHook,
D: crate::durability::Durability = crate::durability::Bitcask,
> where
K: Key + Send + Sync + Hash + Eq,
T: Copy + zerocopy::IntoBytes + zerocopy::FromBytes + zerocopy::Immutable + Send + Sync,
{
pub name: String,
pub typ_hash: u64,
pub ty: armour_core::Typ,
pub key_scheme: armour_core::KeyScheme,
pub version: u16,
pub map: Arc<ZeroMap<K, V, T, H, D>>,
pub seq: Arc<super::seq::SeqGen>,
}
impl<K, const V: usize, T, H, D> RpcHandler for ZeroMapHandler<K, V, T, H, D>
where
K: Key + Send + Sync + Hash + Eq,
T: Copy + zerocopy::IntoBytes + zerocopy::FromBytes + zerocopy::Immutable + Send + Sync,
H: TypedWriteHook<K, T>,
D: crate::durability::Durability,
{
fn name(&self) -> &str {
&self.name
}
fn info(&self) -> (u64, u16) {
(self.typ_hash, self.version)
}
fn schema(&self) -> armour_rpc::SchemaResponse {
armour_rpc::SchemaResponse {
name: self.name.clone(),
version: self.version,
typ: self.ty,
key_scheme: self.key_scheme,
}
}
fn get(&self, key: &[u8]) -> DbResult<Option<ValueBytes>> {
let key: K = check_key_len(key)?;
match self.map.get(&key) {
Some(val) => Ok(Some(to_bytes::<V, T>(&val).to_vec())),
None => Ok(None),
}
}
fn contains(&self, key: &[u8]) -> DbResult<bool> {
let k = check_key_len::<K>(key)?;
Ok(self.map.contains(&k))
}
fn entry_len(&self, key: &[u8]) -> DbResult<Option<u32>> {
let k = check_key_len::<K>(key)?;
Ok(if self.map.contains(&k) {
Some(V as u32)
} else {
None
})
}
fn first(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
unsupported("first")?;
unreachable!()
}
fn last(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
unsupported("last")?;
unreachable!()
}
fn range(
&self,
_start: Bound<KeyBytes>,
_end: Bound<KeyBytes>,
_limit: u32,
) -> DbResult<Vec<(KeyBytes, ValueBytes)>> {
unsupported("range")?;
unreachable!()
}
fn range_keys(
&self,
_start: Bound<KeyBytes>,
_end: Bound<KeyBytes>,
_limit: u32,
) -> DbResult<Vec<ValueBytes>> {
unsupported("range_keys")?;
unreachable!()
}
fn upsert(
&self,
key: UpsertKey,
flag: Option<bool>,
value: ValueBytes,
) -> DbResult<ValueBytes> {
let (typed_key, key_bytes) = resolve_key::<K>(key, &self.seq, &self.name)?;
if value.len() != V {
return Err(DbError::Client("invalid value length for zero collection"));
}
let zero_val: T = from_value_bytes::<V, T>(
value[..V]
.try_into()
.map_err(|_| DbError::CorruptedEntry { offset: 0 })?,
);
match flag {
None => {
self.map.put(&typed_key, &zero_val)?;
Ok(key_bytes)
}
Some(false) => self
.map
.atomic(&typed_key, |shard| {
if shard.contains(&typed_key) {
return Err(DbError::KeyExists);
}
shard.put(&typed_key, &zero_val)?;
Ok(())
})
.map(|_| key_bytes),
Some(true) => self
.map
.atomic(&typed_key, |shard| {
if !shard.contains(&typed_key) {
return Err(DbError::KeyNotFound);
}
shard.put(&typed_key, &zero_val)?;
Ok(())
})
.map(|_| key_bytes),
}
}
fn remove(&self, key: &[u8], soft: bool) -> DbResult<()> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
self.map.delete(&k).map(|_| ())
}
fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<ValueBytes>> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
self.map.atomic(&k, |shard| {
let current = shard.get(&k);
let bytes = current.as_ref().map(|v| to_bytes::<V, T>(v).to_vec());
if current.is_some() {
shard.delete(&k)?;
}
Ok(bytes)
})
}
fn count(&self, _exact: bool) -> DbResult<u64> {
Ok(self.map.len() as u64)
}
fn apply_batch(&self, items: Vec<(KeyBytes, Option<ValueBytes>)>) -> DbResult<()> {
for (key, val) in &items {
if key.len() != size_of::<K>() {
return Err(DbError::Client("invalid key length in batch"));
}
if let Some(v) = val
&& v.len() != V
{
return Err(DbError::Client("invalid value length in batch"));
}
}
for (key_bytes, val) in items {
let k = K::from_bytes(&key_bytes);
match val {
Some(v) => {
let zv: T = from_value_bytes::<V, T>(
v[..V]
.try_into()
.map_err(|_| DbError::CorruptedEntry { offset: 0 })?,
);
self.map.put(&k, &zv)?;
}
None => {
self.map.delete(&k)?;
}
}
}
Ok(())
}
}
#[cfg(feature = "var-collections")]
pub struct VarTypedTreeHandler<K, T, C, H: TypedWriteHook<K, T> = crate::NoHook>
where
K: Key + Ord,
T: Send + Sync,
C: Codec<T> + Clone,
{
pub name: String,
pub typ_hash: u64,
pub ty: armour_core::Typ,
pub key_scheme: armour_core::KeyScheme,
pub version: u16,
pub tree: Arc<crate::VarTypedTree<K, T, C, H>>,
pub codec: Arc<C>,
pub seq: Arc<super::seq::SeqGen>,
}
#[cfg(feature = "var-collections")]
impl<K, T, C, H> RpcHandler for VarTypedTreeHandler<K, T, C, H>
where
K: Key + Ord + Send + Sync,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T>,
{
fn name(&self) -> &str {
&self.name
}
fn info(&self) -> (u64, u16) {
(self.typ_hash, self.version)
}
fn schema(&self) -> armour_rpc::SchemaResponse {
armour_rpc::SchemaResponse {
name: self.name.clone(),
version: self.version,
typ: self.ty,
key_scheme: self.key_scheme,
}
}
fn get(&self, key: &[u8]) -> DbResult<Option<ValueBytes>> {
let key: K = check_key_len(key)?;
match self.tree.get(&key) {
Some(val) => {
let mut buf = Vec::new();
self.codec.encode_to(&val, &mut buf)?;
Ok(Some(buf))
}
None => Ok(None),
}
}
fn contains(&self, key: &[u8]) -> DbResult<bool> {
let k = check_key_len::<K>(key)?;
Ok(self.tree.contains(&k))
}
fn entry_len(&self, key: &[u8]) -> DbResult<Option<u32>> {
let k = check_key_len::<K>(key)?;
Ok(self.tree.entry_len(&k))
}
fn first(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
match self.tree.first() {
Some((k, v)) => {
let mut buf = Vec::new();
self.codec.encode_to(&v, &mut buf)?;
Ok(Some((k.as_bytes().to_vec(), buf)))
}
None => Ok(None),
}
}
fn last(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
match self.tree.last() {
Some((k, v)) => {
let mut buf = Vec::new();
self.codec.encode_to(&v, &mut buf)?;
Ok(Some((k.as_bytes().to_vec(), buf)))
}
None => Ok(None),
}
}
fn range(
&self,
start: Bound<KeyBytes>,
end: Bound<KeyBytes>,
limit: u32,
) -> DbResult<Vec<(KeyBytes, ValueBytes)>> {
let sb = bound_to_key_bound::<K>(&start)?;
let eb = bound_to_key_bound::<K>(&end)?;
let iter = self.tree.range_bounds(sb.as_ref(), eb.as_ref());
let mut result = Vec::new();
for (k, v) in apply_limit(iter, limit) {
let mut buf = Vec::new();
self.codec.encode_to(&v, &mut buf)?;
result.push((k.as_bytes().to_vec(), buf));
}
Ok(result)
}
fn range_keys(
&self,
start: Bound<KeyBytes>,
end: Bound<KeyBytes>,
limit: u32,
) -> DbResult<Vec<ValueBytes>> {
let sb = bound_to_key_bound::<K>(&start)?;
let eb = bound_to_key_bound::<K>(&end)?;
let iter = self.tree.range_bounds(sb.as_ref(), eb.as_ref());
Ok(apply_limit(iter, limit)
.map(|(k, _)| k.as_bytes().to_vec())
.collect())
}
fn upsert(
&self,
key: UpsertKey,
flag: Option<bool>,
value: ValueBytes,
) -> DbResult<ValueBytes> {
let (typed_key, key_bytes) = resolve_key::<K>(key, &self.seq, &self.name)?;
let decoded = self.codec.decode_from(&value)?;
match flag {
None => {
self.tree.put(&typed_key, &decoded)?;
Ok(key_bytes)
}
Some(false) => self
.tree
.atomic(&typed_key, |shard| {
if shard.contains(&typed_key) {
return Err(DbError::KeyExists);
}
shard.put(&typed_key, &decoded)?;
Ok(())
})
.map(|_| key_bytes),
Some(true) => self
.tree
.atomic(&typed_key, |shard| {
if !shard.contains(&typed_key) {
return Err(DbError::KeyNotFound);
}
shard.put(&typed_key, &decoded)?;
Ok(())
})
.map(|_| key_bytes),
}
}
fn remove(&self, key: &[u8], soft: bool) -> DbResult<()> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
self.tree.delete(&k).map(|_| ())
}
fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<ValueBytes>> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
let codec = Arc::clone(&self.codec);
self.tree.atomic(&k, |shard| {
let current = shard.get(&k);
let bytes = if let Some(ref v) = current {
let mut buf = Vec::new();
codec.encode_to(v, &mut buf)?;
Some(buf)
} else {
None
};
if current.is_some() {
shard.delete(&k)?;
}
Ok(bytes)
})
}
fn count(&self, exact: bool) -> DbResult<u64> {
Ok(if exact {
self.tree.iter().count() as u64
} else {
self.tree.len() as u64
})
}
fn apply_batch(&self, items: Vec<(KeyBytes, Option<ValueBytes>)>) -> DbResult<()> {
let decoded_items: Vec<(K, Option<T>)> = items
.into_iter()
.map(|(key, val)| {
if key.len() != size_of::<K>() {
return Err(DbError::Client("invalid key length in batch"));
}
let decoded_val = match val {
Some(v) => Some(
self.codec
.decode_from(&v)
.map_err(|_| DbError::Client("invalid value bytes in batch"))?,
),
None => None,
};
Ok((K::from_bytes(&key), decoded_val))
})
.collect::<DbResult<_>>()?;
for (k, val) in decoded_items {
match val {
Some(decoded) => {
self.tree.put(&k, &decoded)?;
}
None => {
self.tree.delete(&k)?;
}
}
}
Ok(())
}
}
#[cfg(feature = "var-collections")]
pub struct VarTypedMapHandler<K, T, C, H: TypedWriteHook<K, T> = crate::NoHook>
where
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T> + Clone,
{
pub name: String,
pub typ_hash: u64,
pub ty: armour_core::Typ,
pub key_scheme: armour_core::KeyScheme,
pub version: u16,
pub map: Arc<crate::VarTypedMap<K, T, C, H>>,
pub codec: Arc<C>,
pub seq: Arc<super::seq::SeqGen>,
}
#[cfg(feature = "var-collections")]
impl<K, T, C, H> RpcHandler for VarTypedMapHandler<K, T, C, H>
where
K: Key + Send + Sync + Hash + Eq,
T: Send + Sync,
C: Codec<T> + Clone,
H: TypedWriteHook<K, T>,
{
fn name(&self) -> &str {
&self.name
}
fn info(&self) -> (u64, u16) {
(self.typ_hash, self.version)
}
fn schema(&self) -> armour_rpc::SchemaResponse {
armour_rpc::SchemaResponse {
name: self.name.clone(),
version: self.version,
typ: self.ty,
key_scheme: self.key_scheme,
}
}
fn get(&self, key: &[u8]) -> DbResult<Option<ValueBytes>> {
let key: K = check_key_len(key)?;
match self.map.get(&key) {
Some(val) => {
let mut buf = Vec::new();
self.codec.encode_to(&val, &mut buf)?;
Ok(Some(buf))
}
None => Ok(None),
}
}
fn contains(&self, key: &[u8]) -> DbResult<bool> {
let k = check_key_len::<K>(key)?;
Ok(self.map.contains(&k))
}
fn entry_len(&self, key: &[u8]) -> DbResult<Option<u32>> {
let k = check_key_len::<K>(key)?;
Ok(self.map.entry_len(&k))
}
fn first(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
unsupported("first")?;
unreachable!()
}
fn last(&self) -> DbResult<Option<(KeyBytes, ValueBytes)>> {
unsupported("last")?;
unreachable!()
}
fn range(
&self,
_s: Bound<KeyBytes>,
_e: Bound<KeyBytes>,
_limit: u32,
) -> DbResult<Vec<(KeyBytes, ValueBytes)>> {
unsupported("range")?;
unreachable!()
}
fn range_keys(
&self,
_s: Bound<KeyBytes>,
_e: Bound<KeyBytes>,
_limit: u32,
) -> DbResult<Vec<ValueBytes>> {
unsupported("range_keys")?;
unreachable!()
}
fn upsert(
&self,
key: UpsertKey,
flag: Option<bool>,
value: ValueBytes,
) -> DbResult<ValueBytes> {
let (typed_key, key_bytes) = resolve_key::<K>(key, &self.seq, &self.name)?;
let decoded = self.codec.decode_from(&value)?;
match flag {
None => {
self.map.put(&typed_key, &decoded)?;
Ok(key_bytes)
}
Some(false) => self
.map
.atomic(&typed_key, |shard| {
if shard.contains(&typed_key) {
return Err(DbError::KeyExists);
}
shard.put(&typed_key, &decoded)?;
Ok(())
})
.map(|_| key_bytes),
Some(true) => self
.map
.atomic(&typed_key, |shard| {
if !shard.contains(&typed_key) {
return Err(DbError::KeyNotFound);
}
shard.put(&typed_key, &decoded)?;
Ok(())
})
.map(|_| key_bytes),
}
}
fn remove(&self, key: &[u8], soft: bool) -> DbResult<()> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
self.map.delete(&k).map(|_| ())
}
fn take(&self, key: &[u8], soft: bool) -> DbResult<Option<ValueBytes>> {
if soft {
return Err(DbError::NotImplemented);
}
let k = check_key_len::<K>(key)?;
let codec = Arc::clone(&self.codec);
self.map.atomic(&k, |shard| {
let current = shard.get(&k);
let bytes = if let Some(ref v) = current {
let mut buf = Vec::new();
codec.encode_to(v, &mut buf)?;
Some(buf)
} else {
None
};
if current.is_some() {
shard.delete(&k)?;
}
Ok(bytes)
})
}
fn count(&self, _exact: bool) -> DbResult<u64> {
Ok(self.map.len() as u64)
}
fn apply_batch(&self, items: Vec<(KeyBytes, Option<ValueBytes>)>) -> DbResult<()> {
let decoded_items: Vec<(K, Option<T>)> = items
.into_iter()
.map(|(key, val)| {
if key.len() != size_of::<K>() {
return Err(DbError::Client("invalid key length in batch"));
}
let decoded_val = match val {
Some(v) => Some(
self.codec
.decode_from(&v)
.map_err(|_| DbError::Client("invalid value bytes in batch"))?,
),
None => None,
};
Ok((K::from_bytes(&key), decoded_val))
})
.collect::<DbResult<_>>()?;
for (k, val) in decoded_items {
match val {
Some(decoded) => {
self.map.put(&k, &decoded)?;
}
None => {
self.map.delete(&k)?;
}
}
}
Ok(())
}
}