use crate::client::Transport;
use crate::error::{ErrKind, Error, Result};
use crate::pb::{self, kv_client::KvClient as PbKvClient};
use crate::utils::build_prefix_end;
use crate::Client;
use helper::*;
#[derive(Debug, Clone)]
pub struct KvClient {
inner: PbKvClient<Transport>,
}
impl KvClient {
pub(crate) fn new(transport: Transport) -> Self {
let inner = PbKvClient::new(transport);
KvClient { inner }
}
pub fn with_client(client: &Client) -> Self {
Self::new(client.transport.clone())
}
pub fn do_range(&mut self, key: impl AsRef<[u8]>) -> DoRangeRequest {
DoRangeRequest::new(key, self)
}
#[inline]
pub async fn get(&mut self, key: impl AsRef<[u8]>) -> Result<Vec<u8>> {
let resp = self.do_range(key).await?;
let kv = resp
.kvs
.first()
.ok_or_else(|| Error::from_kind(ErrKind::KeyNotFound))?;
Ok(kv.value.clone())
}
#[inline]
pub async fn get_string(&mut self, key: impl AsRef<[u8]>) -> Result<String> {
let value = self.get(key).await?;
String::from_utf8(value).map_err(|err| Error::new(ErrKind::InvalidData, err))
}
#[inline]
pub async fn get_with_prefix(&mut self, key: impl AsRef<[u8]>) -> Result<Vec<pb::KeyValue>> {
let resp = self.do_range(key).with_prefix().await?;
Ok(resp.kvs.to_vec())
}
#[inline]
pub async fn all(&mut self) -> Result<Vec<pb::KeyValue>> {
let resp = self.do_range([0x00]).with_range_end(vec![0x00]).await?;
Ok(resp.kvs.to_vec())
}
pub fn do_put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> DoPutRequest {
DoPutRequest::new(key, value, self)
}
pub async fn put_kv(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Result<()> {
self.do_put(key, value).await.map(|_| ())
}
pub fn do_delete_range(&mut self, key: impl AsRef<[u8]>) -> DoDeleteRangeRequest {
DoDeleteRangeRequest::new(key, self)
}
pub async fn delete(&mut self, key: impl AsRef<[u8]>) -> Result<()> {
self.do_delete_range(key).await.map(|_| ())
}
pub fn do_txn(&mut self) -> DoTxnRequest {
DoTxnRequest::new(self)
}
pub fn do_compaction(&mut self, revision: i64, physical: bool) -> DoCompactionRequest {
DoCompactionRequest::new(revision, physical, self)
}
pub async fn compact_history(&mut self, revision: i64, physical: bool) -> Result<()> {
let _resp = self.do_compaction(revision, physical).await?;
Ok(())
}
}
impl pb::RangeRequest {
pub fn new(key: impl AsRef<[u8]>) -> Self {
pb::RangeRequest {
key: key.as_ref().to_vec(),
..Default::default()
}
}
}
impl DoRangeRequest {
pub fn new(key: impl AsRef<[u8]>, client: &KvClient) -> Self {
DoRangeRequest {
request: pb::RangeRequest::new(key),
client: client.clone(),
}
}
pub fn with_prefix(mut self) -> Self {
self.request.range_end = build_prefix_end(&self.request.key);
self
}
}
impl pb::PutRequest {
pub fn new(key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Self {
pb::PutRequest {
key: key.as_ref().to_vec(),
value: value.as_ref().to_vec(),
..Default::default()
}
}
}
impl DoPutRequest {
pub fn new(key: impl AsRef<[u8]>, value: impl AsRef<[u8]>, client: &KvClient) -> Self {
DoPutRequest {
request: pb::PutRequest::new(key, value),
client: client.clone(),
}
}
}
impl pb::DeleteRangeRequest {
pub fn new(key: impl AsRef<[u8]>) -> Self {
pb::DeleteRangeRequest {
key: key.as_ref().to_vec(),
..Default::default()
}
}
}
impl DoDeleteRangeRequest {
pub fn new(key: impl AsRef<[u8]>, client: &KvClient) -> Self {
DoDeleteRangeRequest {
request: pb::DeleteRangeRequest::new(key),
client: client.clone(),
}
}
pub fn with_prefix(mut self) -> Self {
self.request.range_end = build_prefix_end(&self.request.key);
self
}
}
impl pb::Compare {
pub fn new(
key: impl AsRef<[u8]>,
result: pb::compare::CompareResult,
target_union: pb::compare::TargetUnion,
) -> Self {
let target = match target_union {
pb::compare::TargetUnion::Version(_) => pb::compare::CompareTarget::Version,
pb::compare::TargetUnion::CreateRevision(_) => pb::compare::CompareTarget::Create,
pb::compare::TargetUnion::ModRevision(_) => pb::compare::CompareTarget::Mod,
pb::compare::TargetUnion::Value(_) => pb::compare::CompareTarget::Value,
pb::compare::TargetUnion::Lease(_) => pb::compare::CompareTarget::Lease,
};
pb::Compare {
key: key.as_ref().to_vec(),
result: result.into(),
target: target.into(),
target_union: Some(target_union),
..Default::default()
}
}
pub fn with_range_end(mut self, end: impl AsRef<[u8]>) -> Self {
self.range_end = end.as_ref().to_vec();
self
}
pub fn with_prefix(mut self) -> Self {
self.range_end = build_prefix_end(&self.key);
self
}
}
impl pb::TxnRequest {
pub fn new() -> Self {
pb::TxnRequest {
..Default::default()
}
}
pub fn with_if(mut self, cmps: Vec<pb::Compare>) -> Self {
self.compare = cmps;
self
}
pub fn with_then(mut self, ops: Vec<pb::RequestOp>) -> Self {
self.success = ops;
self
}
pub fn with_else(mut self, ops: Vec<pb::RequestOp>) -> Self {
self.failure = ops;
self
}
}
impl From<pb::RangeRequest> for pb::RequestOp {
fn from(request: pb::RangeRequest) -> Self {
let request_op = pb::request_op::Request::RequestRange(request);
pb::RequestOp {
request: Some(request_op),
}
}
}
impl From<pb::PutRequest> for pb::RequestOp {
fn from(request: pb::PutRequest) -> Self {
let request_op = pb::request_op::Request::RequestPut(request);
pb::RequestOp {
request: Some(request_op),
}
}
}
impl From<pb::DeleteRangeRequest> for pb::RequestOp {
fn from(request: pb::DeleteRangeRequest) -> Self {
let request_op = pb::request_op::Request::RequestDeleteRange(request);
pb::RequestOp {
request: Some(request_op),
}
}
}
impl From<pb::TxnRequest> for pb::RequestOp {
fn from(request: pb::TxnRequest) -> Self {
let request_op = pb::request_op::Request::RequestTxn(request);
pb::RequestOp {
request: Some(request_op),
}
}
}
impl DoTxnRequest {
pub fn new(client: &KvClient) -> Self {
DoTxnRequest {
request: pb::TxnRequest {
..Default::default()
},
client: client.clone(),
}
}
pub fn with_if(mut self, cmps: Vec<pb::Compare>) -> Self {
self.request = self.request.with_if(cmps);
self
}
pub fn with_then(mut self, ops: Vec<pb::RequestOp>) -> Self {
self.request = self.request.with_then(ops);
self
}
pub fn with_else(mut self, ops: Vec<pb::RequestOp>) -> Self {
self.request = self.request.with_else(ops);
self
}
}
impl pb::CompactionRequest {
pub fn new(revision: i64, physical: bool) -> Self {
pb::CompactionRequest { revision, physical }
}
}
impl DoCompactionRequest {
pub fn new(revision: i64, physical: bool, client: &KvClient) -> Self {
DoCompactionRequest {
request: pb::CompactionRequest::new(revision, physical),
client: client.clone(),
}
}
}
mod helper {
#![allow(dead_code)]
use crate::error::Result;
use crate::kv::KvClient;
use crate::pb;
include!("pb/kv_helper.rs");
}