armdb 0.1.12

sharded bitcask key-value storage optimized for NVMe
Documentation
use std::marker::PhantomData;
use std::ops::Bound;
use std::path::Path;

use armour_rpc::{Result, RpcClient, RpcError, UpsertKey};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::{OwnedReadHalf, OwnedWriteHalf, TcpStream, ToSocketAddrsAsync, UnixStream};

use crate::{Codec, CollectionMeta, Key};

pub struct TypedRpcClient<Item, C, R, W>
where
    Item: CollectionMeta,
    C: Codec<Item>,
{
    inner: RpcClient<R, W>,
    codec: C,
    _phantom: PhantomData<Item>,
}

fn encode_bound<K: Key>(bound: Bound<&K>) -> Bound<Vec<u8>> {
    match bound {
        Bound::Unbounded => Bound::Unbounded,
        Bound::Included(k) => Bound::Included(k.as_bytes().to_vec()),
        Bound::Excluded(k) => Bound::Excluded(k.as_bytes().to_vec()),
    }
}

impl<Item, C> TypedRpcClient<Item, C, OwnedReadHalf<TcpStream>, OwnedWriteHalf<TcpStream>>
where
    Item: CollectionMeta,
    C: Codec<Item> + Default,
{
    pub async fn connect_tcp(addr: impl ToSocketAddrsAsync) -> Result<Self> {
        let inner = RpcClient::connect_tcp_by_name(addr, Item::NAME).await?;
        Ok(Self {
            inner,
            codec: C::default(),
            _phantom: PhantomData,
        })
    }
}

impl<Item, C> TypedRpcClient<Item, C, OwnedReadHalf<UnixStream>, OwnedWriteHalf<UnixStream>>
where
    Item: CollectionMeta,
    C: Codec<Item> + Default,
{
    pub async fn connect_uds(path: impl AsRef<Path>) -> Result<Self> {
        let inner = RpcClient::connect_uds_by_name(path, Item::NAME).await?;
        Ok(Self {
            inner,
            codec: C::default(),
            _phantom: PhantomData,
        })
    }
}

impl<Item, C, R, W> TypedRpcClient<Item, C, R, W>
where
    Item: CollectionMeta,
    C: Codec<Item>,
    R: AsyncRead + Unpin + 'static,
    W: AsyncWrite + Unpin + 'static,
{
    pub async fn get(&mut self, id: &Item::SelfId) -> Result<Option<Item>> {
        let data = self.inner.get(id.as_bytes()).await?;
        match data {
            None => Ok(None),
            Some(bytes) => {
                let item = self
                    .codec
                    .decode_from(&bytes)
                    .map_err(|e| RpcError::Encoding(e.to_string()))?;
                Ok(Some(item))
            }
        }
    }

    pub async fn contains(&mut self, id: &Item::SelfId) -> Result<Option<u32>> {
        self.inner.contains(id.as_bytes()).await
    }

    pub async fn first(&mut self) -> Result<Option<(Item::SelfId, Item)>> {
        let data = self.inner.first().await?;
        self.decode_kv(data)
    }

    pub async fn last(&mut self) -> Result<Option<(Item::SelfId, Item)>> {
        let data = self.inner.last().await?;
        self.decode_kv(data)
    }

    pub async fn range(
        &mut self,
        start: Bound<&Item::SelfId>,
        end: Bound<&Item::SelfId>,
    ) -> Result<Vec<(Item::SelfId, Item)>> {
        let start = encode_bound::<Item::SelfId>(start);
        let end = encode_bound::<Item::SelfId>(end);
        let data = self.inner.range(start, end).await?;
        data.into_iter()
            .map(|(key, val)| {
                let id = Item::SelfId::from_bytes(&key);
                let item = self
                    .codec
                    .decode_from(&val)
                    .map_err(|e| RpcError::Encoding(e.to_string()))?;
                Ok((id, item))
            })
            .collect()
    }

    pub async fn range_keys(
        &mut self,
        start: Bound<&Item::SelfId>,
        end: Bound<&Item::SelfId>,
    ) -> Result<Vec<Item::SelfId>> {
        let start = encode_bound::<Item::SelfId>(start);
        let end = encode_bound::<Item::SelfId>(end);
        let data = self.inner.range_keys(start, end).await?;
        Ok(data
            .iter()
            .map(|key| Item::SelfId::from_bytes(key))
            .collect())
    }

    pub async fn count(&mut self, exact: bool) -> Result<u64> {
        self.inner.count(exact).await
    }

    pub async fn upsert(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
        let value = self.encode_value(item);
        self.inner
            .upsert(UpsertKey::Provided(id.as_bytes().to_vec()), None, value)
            .await?;
        Ok(())
    }

    pub async fn insert(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
        let value = self.encode_value(item);
        self.inner
            .upsert(
                UpsertKey::Provided(id.as_bytes().to_vec()),
                Some(false),
                value,
            )
            .await?;
        Ok(())
    }

    pub async fn update(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
        let value = self.encode_value(item);
        self.inner
            .upsert(
                UpsertKey::Provided(id.as_bytes().to_vec()),
                Some(true),
                value,
            )
            .await?;
        Ok(())
    }

    pub async fn remove(&mut self, id: &Item::SelfId) -> Result<()> {
        self.inner.remove(id.as_bytes(), false).await
    }

    pub async fn take(&mut self, id: &Item::SelfId) -> Result<Option<Item>> {
        let data = self.inner.take(id.as_bytes(), false).await?;
        match data {
            None => Ok(None),
            Some(bytes) => {
                let item = self
                    .codec
                    .decode_from(&bytes)
                    .map_err(|e| RpcError::Encoding(e.to_string()))?;
                Ok(Some(item))
            }
        }
    }

    pub async fn apply_batch(
        &mut self,
        items: impl Iterator<Item = (Item::SelfId, Option<Item>)>,
    ) -> Result<()> {
        let raw: Vec<(Vec<u8>, Option<Vec<u8>>)> = items
            .map(|(id, val)| {
                let key = id.as_bytes().to_vec();
                let value = val.map(|v| self.encode_value(&v));
                (key, value)
            })
            .collect();
        self.inner.apply_batch(raw).await
    }

    fn decode_kv(&self, data: Option<(Vec<u8>, Vec<u8>)>) -> Result<Option<(Item::SelfId, Item)>> {
        match data {
            None => Ok(None),
            Some((key, val)) => {
                let id = Item::SelfId::from_bytes(&key);
                let item = self
                    .codec
                    .decode_from(&val)
                    .map_err(|e| RpcError::Encoding(e.to_string()))?;
                Ok(Some((id, item)))
            }
        }
    }

    fn encode_value(&self, item: &Item) -> Vec<u8> {
        let mut buf = Vec::new();
        self.codec.encode_to(item, &mut buf);
        buf
    }
}