armour 0.30.27

DDL and serialization for key-value storage
Documentation
use std::marker::PhantomData;
use std::ops::Bound;
use std::path::Path;

use armour_rpc::{Result, RpcClient, RpcError, UpsertKey};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::{OwnedReadHalf, OwnedWriteHalf, TcpStream, ToSocketAddrsAsync, UnixStream};

use crate::{Cid, Record};

pub struct TypedRpcClient<Item: Record, R, W> {
    inner: RpcClient<R, W>,
    _phantom: PhantomData<Item>,
}

fn encode_bound<Id: Cid>(bound: Bound<&Id>) -> Bound<Vec<u8>> {
    match bound {
        Bound::Unbounded => Bound::Unbounded,
        Bound::Included(id) => Bound::Included(id.encode().as_ref().to_vec()),
        Bound::Excluded(id) => Bound::Excluded(id.encode().as_ref().to_vec()),
    }
}

impl<Item: Record> TypedRpcClient<Item, OwnedReadHalf<TcpStream>, OwnedWriteHalf<TcpStream>> {
    pub async fn connect_tcp(addr: impl ToSocketAddrsAsync) -> Result<Self> {
        let inner = RpcClient::connect_tcp_by_name(addr, Item::NAME).await?;
        Ok(Self {
            inner,
            _phantom: PhantomData,
        })
    }
}

impl<Item: Record> TypedRpcClient<Item, OwnedReadHalf<UnixStream>, OwnedWriteHalf<UnixStream>> {
    pub async fn connect_uds(path: impl AsRef<Path>) -> Result<Self> {
        let inner = RpcClient::connect_uds_by_name(path, Item::NAME).await?;
        Ok(Self {
            inner,
            _phantom: PhantomData,
        })
    }
}

impl<Item, R, W> TypedRpcClient<Item, R, W>
where
    Item: Record,
    Item::Value: AsRef<[u8]> + From<Vec<u8>>,
    R: AsyncRead + Unpin + 'static,
    W: AsyncWrite + Unpin + 'static,
{
    pub async fn get(&mut self, id: &Item::SelfId) -> Result<Option<Item>> {
        let key = id.encode();
        let data = self.inner.get(key.as_ref()).await?;
        match data {
            None => Ok(None),
            Some(bytes) => {
                let value = Item::Value::from(bytes);
                let item =
                    Item::try_deser(&value).map_err(|e| RpcError::Encoding(e.to_string()))?;
                Ok(Some(item))
            }
        }
    }

    pub async fn contains(&mut self, id: &Item::SelfId) -> Result<Option<u32>> {
        let key = id.encode();
        self.inner.contains(key.as_ref()).await
    }

    pub async fn first(&mut self) -> Result<Option<(Item::SelfId, Item)>> {
        let data = self.inner.first().await?;
        decode_kv::<Item>(data)
    }

    pub async fn last(&mut self) -> Result<Option<(Item::SelfId, Item)>> {
        let data = self.inner.last().await?;
        decode_kv::<Item>(data)
    }

    pub async fn range(
        &mut self,
        start: Bound<&Item::SelfId>,
        end: Bound<&Item::SelfId>,
    ) -> Result<Vec<(Item::SelfId, Item)>> {
        let start = encode_bound::<Item::SelfId>(start);
        let end = encode_bound::<Item::SelfId>(end);
        let data = self.inner.range(start, end).await?;
        data.into_iter()
            .map(|(key, val)| {
                let id =
                    Item::try_deser_key(&key).map_err(|e| RpcError::Encoding(e.to_string()))?;
                let value = Item::Value::from(val);
                let item =
                    Item::try_deser(&value).map_err(|e| RpcError::Encoding(e.to_string()))?;
                Ok((id, item))
            })
            .collect()
    }

    pub async fn range_keys(
        &mut self,
        start: Bound<&Item::SelfId>,
        end: Bound<&Item::SelfId>,
    ) -> Result<Vec<Item::SelfId>> {
        let start = encode_bound::<Item::SelfId>(start);
        let end = encode_bound::<Item::SelfId>(end);
        let data = self.inner.range_keys(start, end).await?;
        data.into_iter()
            .map(|key| Item::try_deser_key(&key).map_err(|e| RpcError::Encoding(e.to_string())))
            .collect()
    }

    pub async fn count(&mut self, exact: bool) -> Result<u64> {
        self.inner.count(exact).await
    }

    pub async fn upsert(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
        let key = id.encode();
        let value = item.ser();
        self.inner
            .upsert(
                UpsertKey::Provided(key.as_ref().to_vec()),
                None,
                value.as_ref().to_vec(),
            )
            .await?;
        Ok(())
    }

    pub async fn insert(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
        let key = id.encode();
        let value = item.ser();
        self.inner
            .upsert(
                UpsertKey::Provided(key.as_ref().to_vec()),
                Some(false),
                value.as_ref().to_vec(),
            )
            .await?;
        Ok(())
    }

    pub async fn update(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
        let key = id.encode();
        let value = item.ser();
        self.inner
            .upsert(
                UpsertKey::Provided(key.as_ref().to_vec()),
                Some(true),
                value.as_ref().to_vec(),
            )
            .await?;
        Ok(())
    }

    pub async fn remove(&mut self, id: &Item::SelfId) -> Result<()> {
        let key = id.encode();
        self.inner.remove(key.as_ref(), false).await
    }

    pub async fn take(&mut self, id: &Item::SelfId) -> Result<Option<Item>> {
        let key = id.encode();
        let data = self.inner.take(key.as_ref(), false).await?;
        match data {
            None => Ok(None),
            Some(bytes) => {
                let value = Item::Value::from(bytes);
                let item =
                    Item::try_deser(&value).map_err(|e| RpcError::Encoding(e.to_string()))?;
                Ok(Some(item))
            }
        }
    }

    pub async fn apply_batch(
        &mut self,
        items: impl Iterator<Item = (Item::SelfId, Option<Item>)>,
    ) -> Result<()> {
        let raw: Vec<(Vec<u8>, Option<Vec<u8>>)> = items
            .map(|(id, val)| {
                let key = id.encode().as_ref().to_vec();
                let value = val.map(|v| v.ser().as_ref().to_vec());
                (key, value)
            })
            .collect();
        self.inner.apply_batch(raw).await
    }
}

fn decode_kv<Item>(data: Option<(Vec<u8>, Vec<u8>)>) -> Result<Option<(Item::SelfId, Item)>>
where
    Item: Record,
    Item::Value: From<Vec<u8>>,
{
    match data {
        None => Ok(None),
        Some((key, val)) => {
            let id = Item::try_deser_key(&key).map_err(|e| RpcError::Encoding(e.to_string()))?;
            let value = Item::Value::from(val);
            let item = Item::try_deser(&value).map_err(|e| RpcError::Encoding(e.to_string()))?;
            Ok(Some((id, item)))
        }
    }
}