use std::marker::PhantomData;
use std::ops::Bound;
use std::path::Path;
use armour_rpc::{Result, RpcClient, RpcError, UpsertKey};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::{OwnedReadHalf, OwnedWriteHalf, TcpStream, ToSocketAddrsAsync, UnixStream};
use crate::{Codec, CollectionMeta, Key};
pub struct TypedRpcClient<Item, C, R, W>
where
Item: CollectionMeta,
C: Codec<Item>,
{
inner: RpcClient<R, W>,
codec: C,
_phantom: PhantomData<Item>,
}
fn encode_bound<K: Key>(bound: Bound<&K>) -> Bound<Vec<u8>> {
match bound {
Bound::Unbounded => Bound::Unbounded,
Bound::Included(k) => Bound::Included(k.as_bytes().to_vec()),
Bound::Excluded(k) => Bound::Excluded(k.as_bytes().to_vec()),
}
}
impl<Item, C> TypedRpcClient<Item, C, OwnedReadHalf<TcpStream>, OwnedWriteHalf<TcpStream>>
where
Item: CollectionMeta,
C: Codec<Item> + Default,
{
pub async fn connect_tcp(addr: impl ToSocketAddrsAsync) -> Result<Self> {
let inner = RpcClient::connect_tcp_by_name(addr, Item::NAME).await?;
Ok(Self {
inner,
codec: C::default(),
_phantom: PhantomData,
})
}
}
impl<Item, C> TypedRpcClient<Item, C, OwnedReadHalf<UnixStream>, OwnedWriteHalf<UnixStream>>
where
Item: CollectionMeta,
C: Codec<Item> + Default,
{
pub async fn connect_uds(path: impl AsRef<Path>) -> Result<Self> {
let inner = RpcClient::connect_uds_by_name(path, Item::NAME).await?;
Ok(Self {
inner,
codec: C::default(),
_phantom: PhantomData,
})
}
}
impl<Item, C, R, W> TypedRpcClient<Item, C, R, W>
where
Item: CollectionMeta,
C: Codec<Item>,
R: AsyncRead + Unpin + 'static,
W: AsyncWrite + Unpin + 'static,
{
pub async fn get(&mut self, id: &Item::SelfId) -> Result<Option<Item>> {
let data = self.inner.get(id.as_bytes()).await?;
match data {
None => Ok(None),
Some(bytes) => {
let item = self
.codec
.decode_from(&bytes)
.map_err(|e| RpcError::Encoding(e.to_string()))?;
Ok(Some(item))
}
}
}
pub async fn contains(&mut self, id: &Item::SelfId) -> Result<Option<u32>> {
self.inner.contains(id.as_bytes()).await
}
pub async fn first(&mut self) -> Result<Option<(Item::SelfId, Item)>> {
let data = self.inner.first().await?;
self.decode_kv(data)
}
pub async fn last(&mut self) -> Result<Option<(Item::SelfId, Item)>> {
let data = self.inner.last().await?;
self.decode_kv(data)
}
pub async fn range(
&mut self,
start: Bound<&Item::SelfId>,
end: Bound<&Item::SelfId>,
) -> Result<Vec<(Item::SelfId, Item)>> {
let start = encode_bound::<Item::SelfId>(start);
let end = encode_bound::<Item::SelfId>(end);
let data = self.inner.range(start, end).await?;
data.into_iter()
.map(|(key, val)| {
let id = Item::SelfId::from_bytes(&key);
let item = self
.codec
.decode_from(&val)
.map_err(|e| RpcError::Encoding(e.to_string()))?;
Ok((id, item))
})
.collect()
}
pub async fn range_keys(
&mut self,
start: Bound<&Item::SelfId>,
end: Bound<&Item::SelfId>,
) -> Result<Vec<Item::SelfId>> {
let start = encode_bound::<Item::SelfId>(start);
let end = encode_bound::<Item::SelfId>(end);
let data = self.inner.range_keys(start, end).await?;
Ok(data
.iter()
.map(|key| Item::SelfId::from_bytes(key))
.collect())
}
pub async fn count(&mut self, exact: bool) -> Result<u64> {
self.inner.count(exact).await
}
pub async fn upsert(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
let value = self.encode_value(item);
self.inner
.upsert(UpsertKey::Provided(id.as_bytes().to_vec()), None, value)
.await?;
Ok(())
}
pub async fn insert(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
let value = self.encode_value(item);
self.inner
.upsert(
UpsertKey::Provided(id.as_bytes().to_vec()),
Some(false),
value,
)
.await?;
Ok(())
}
pub async fn update(&mut self, id: &Item::SelfId, item: &Item) -> Result<()> {
let value = self.encode_value(item);
self.inner
.upsert(
UpsertKey::Provided(id.as_bytes().to_vec()),
Some(true),
value,
)
.await?;
Ok(())
}
pub async fn remove(&mut self, id: &Item::SelfId) -> Result<()> {
self.inner.remove(id.as_bytes(), false).await
}
pub async fn take(&mut self, id: &Item::SelfId) -> Result<Option<Item>> {
let data = self.inner.take(id.as_bytes(), false).await?;
match data {
None => Ok(None),
Some(bytes) => {
let item = self
.codec
.decode_from(&bytes)
.map_err(|e| RpcError::Encoding(e.to_string()))?;
Ok(Some(item))
}
}
}
pub async fn apply_batch(
&mut self,
items: impl Iterator<Item = (Item::SelfId, Option<Item>)>,
) -> Result<()> {
let raw: Vec<(Vec<u8>, Option<Vec<u8>>)> = items
.map(|(id, val)| {
let key = id.as_bytes().to_vec();
let value = val.map(|v| self.encode_value(&v));
(key, value)
})
.collect();
self.inner.apply_batch(raw).await
}
fn decode_kv(&self, data: Option<(Vec<u8>, Vec<u8>)>) -> Result<Option<(Item::SelfId, Item)>> {
match data {
None => Ok(None),
Some((key, val)) => {
let id = Item::SelfId::from_bytes(&key);
let item = self
.codec
.decode_from(&val)
.map_err(|e| RpcError::Encoding(e.to_string()))?;
Ok(Some((id, item)))
}
}
}
fn encode_value(&self, item: &Item) -> Vec<u8> {
let mut buf = Vec::new();
self.codec.encode_to(item, &mut buf);
buf
}
}