use std::marker::PhantomData;
use std::ops::Bound;
use std::path::Path;
use armour_rpc::{Result, RpcClient, RpcError, UpsertKey};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::{OwnedReadHalf, OwnedWriteHalf, TcpStream, ToSocketAddrsAsync, UnixStream};
use crate::{Cid, Record};
pub struct TypedRpcClient<Item: Record, R, W> {
inner: RpcClient<R, W>,
_phantom: PhantomData<Item>,
}
fn encode_bound<Id: Cid>(bound: Bound<&Id>) -> Bound<Vec<u8>> {
match bound {
Bound::Unbounded => Bound::Unbounded,
Bound::Included(id) => Bound::Included(id.encode().as_ref().to_vec()),
Bound::Excluded(id) => Bound::Excluded(id.encode().as_ref().to_vec()),
}
}
impl<Item: Record> TypedRpcClient<Item, OwnedReadHalf<TcpStream>, OwnedWriteHalf<TcpStream>> {
pub async fn connect_tcp(addr: impl ToSocketAddrsAsync) -> Result<Self> {
let inner = RpcClient::connect_tcp_by_name(addr, Item::NAME).await?;
Ok(Self {
inner,
_phantom: PhantomData,
})
}
}
impl<Item: Record> TypedRpcClient<Item, OwnedReadHalf<UnixStream>, OwnedWriteHalf<UnixStream>> {
pub async fn connect_uds(path: impl AsRef<Path>) -> Result<Self> {
let inner = RpcClient::connect_uds_by_name(path, Item::NAME).await?;
Ok(Self {
inner,
_phantom: PhantomData,
})
}
}
impl<Item, R, W> TypedRpcClient<Item, R, W>
where
Item: Record,
Item::Value: AsRef<[u8]> + From<Vec<u8>>,
R: AsyncRead + Unpin + 'static,
W: AsyncWrite + Unpin + 'static,
{
pub async fn get(&mut self, id: &Item::SelfId) -> Result<Option<Item>> {
let key = id.encode();
let data = self.inner.get(key.as_ref()).await?;
match data {
None => Ok(None),
Some(bytes) => {
let value = Item::Value::from(bytes);
let item =
Item::try_deser(&value).map_err(|e| RpcError::Encoding(e.to_string()))?;
Ok(Some(item))
}
}
}
pub async fn contains(&mut self, id: &Item::SelfId) -> Result<Option<u32>> {
let key = id.encode();
self.inner.contains(key.as_ref()).await
}
pub async fn first(&mut self) -> Result<Option<(Item::SelfId, Item)>> {
let data = self.inner.first().await?;
decode_kv::<Item>(data)
}
pub async fn last(&mut self) -> Result<Option<(Item::SelfId, Item)>> {
let data = self.inner.last().await?;
decode_kv::<Item>(data)
}
pub async fn range(
&mut self,
start: Bound<&Item::SelfId>,
end: Bound<&Item::SelfId>,
) -> Result<Vec<(Item::SelfId, Item)>> {
let start = encode_bound::<Item::SelfId>(start);
let end = encode_bound::<Item::SelfId>(end);
let data = self.inner.range(start, end).await?;
data.into_iter()
.map(|(key, val)| {
let id =
Item::try_deser_key(&key).map_err(|e| RpcError::Encoding(e.to_string()))?;
let value = Item::Value::from(val);
let item =
Item::try_deser(&value).map_err(|e| RpcError::Encoding(e.to_string()))?;
Ok((id, item))
})
.collect()
}
pub async fn range_keys(
&mut self,
start: Bound<&Item::SelfId>,
end: Bound<&Item::SelfId>,
) -> Result<Vec<Item::SelfId>> {
let start = encode_bound::<Item::SelfId>(start);
let end = encode_bound::<Item::SelfId>(end);
let data = self.inner.range_keys(start, end).await?;
data.into_iter()
.map(|key| Item::try_deser_key(&key).map_err(|e| RpcError::Encoding(e.to_string())))
.collect()
}
pub async fn count(&mut self, exact: bool) -> Result<u64> {
self.inner.count(exact).await
}
pub async fn upsert(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
let key = id.encode();
let value = item.ser();
self.inner
.upsert(
UpsertKey::Provided(key.as_ref().to_vec()),
None,
value.as_ref().to_vec(),
)
.await?;
Ok(())
}
pub async fn insert(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
let key = id.encode();
let value = item.ser();
self.inner
.upsert(
UpsertKey::Provided(key.as_ref().to_vec()),
Some(false),
value.as_ref().to_vec(),
)
.await?;
Ok(())
}
pub async fn update(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
let key = id.encode();
let value = item.ser();
self.inner
.upsert(
UpsertKey::Provided(key.as_ref().to_vec()),
Some(true),
value.as_ref().to_vec(),
)
.await?;
Ok(())
}
pub async fn remove(&mut self, id: &Item::SelfId) -> Result<()> {
let key = id.encode();
self.inner.remove(key.as_ref(), false).await
}
pub async fn take(&mut self, id: &Item::SelfId) -> Result<Option<Item>> {
let key = id.encode();
let data = self.inner.take(key.as_ref(), false).await?;
match data {
None => Ok(None),
Some(bytes) => {
let value = Item::Value::from(bytes);
let item =
Item::try_deser(&value).map_err(|e| RpcError::Encoding(e.to_string()))?;
Ok(Some(item))
}
}
}
pub async fn apply_batch(
&mut self,
items: impl Iterator<Item = (Item::SelfId, Option<Item>)>,
) -> Result<()> {
let raw: Vec<(Vec<u8>, Option<Vec<u8>>)> = items
.map(|(id, val)| {
let key = id.encode().as_ref().to_vec();
let value = val.map(|v| v.ser().as_ref().to_vec());
(key, value)
})
.collect();
self.inner.apply_batch(raw).await
}
}
fn decode_kv<Item>(data: Option<(Vec<u8>, Vec<u8>)>) -> Result<Option<(Item::SelfId, Item)>>
where
Item: Record,
Item::Value: From<Vec<u8>>,
{
match data {
None => Ok(None),
Some((key, val)) => {
let id = Item::try_deser_key(&key).map_err(|e| RpcError::Encoding(e.to_string()))?;
let value = Item::Value::from(val);
let item = Item::try_deser(&value).map_err(|e| RpcError::Encoding(e.to_string()))?;
Ok(Some((id, item)))
}
}
}