use std::sync::Arc;
use arc_swap::ArcSwap;
use bytes::Bytes;
use d_engine_core::ScanResult;
use d_engine_core::client::ErrorCode;
use d_engine_core::client::KvEntry;
use d_engine_core::config::ReadConsistencyPolicy;
use d_engine_proto::client::ClientReadRequest;
use d_engine_proto::client::ClientWriteRequest;
use d_engine_proto::client::MembershipSnapshot;
use d_engine_proto::client::ScanRequest;
use d_engine_proto::client::WatchMembershipRequest;
use d_engine_proto::client::WatchRequest;
use d_engine_proto::client::WatchResponse;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::raft_client_service_client::RaftClientServiceClient;
use rand::Rng;
use rand::SeedableRng;
use rand::rngs::StdRng;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tracing::debug;
use tracing::error;
use tracing::warn;
use super::ClientInner;
use crate::ClientApiError;
use crate::ClientResponseExt;
use crate::scoped_timer::ScopedTimer;
use d_engine_core::client::{ClientApi, ClientApiResult};
#[derive(Clone)]
pub struct GrpcClient {
pub(super) client_inner: Arc<ArcSwap<ClientInner>>,
}
impl GrpcClient {
pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
Self { client_inner }
}
pub async fn get_with_policy(
&self,
key: impl AsRef<[u8]>,
consistency_policy: Option<ReadConsistencyPolicy>,
) -> std::result::Result<Option<KvEntry>, ClientApiError> {
let mut results =
self.get_multi_with_policy(std::iter::once(key), consistency_policy).await?;
Ok(results.pop().unwrap_or(None))
}
pub async fn get_multi_with_policy(
&self,
keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
consistency_policy: Option<ReadConsistencyPolicy>,
) -> std::result::Result<Vec<Option<KvEntry>>, ClientApiError> {
let _timer = ScopedTimer::new("client::get_multi");
let client_inner = self.client_inner.load();
let keys: Vec<Bytes> =
keys.into_iter().map(|k| Bytes::copy_from_slice(k.as_ref())).collect();
if keys.is_empty() {
warn!("Attempted multi-get with empty key collection");
return Err(ErrorCode::InvalidRequest.into());
}
let keys_for_alignment = keys.clone();
let request = ClientReadRequest {
client_id: client_inner.client_id,
keys,
consistency_policy: consistency_policy
.clone()
.map(|p| d_engine_proto::client::ReadConsistencyPolicy::from(p) as i32),
};
let mut client = match consistency_policy {
Some(ReadConsistencyPolicy::LinearizableRead)
| Some(ReadConsistencyPolicy::LeaseRead)
| None => {
debug!("Using leader client for explicit consistency policy");
self.make_leader_client().await?
}
Some(ReadConsistencyPolicy::EventualConsistency) => {
debug!("Using load-balanced client for cluster default policy");
self.make_client().await?
}
};
match client.handle_client_read(request).await {
Ok(response) => {
debug!("Read response: {:?}", response);
let sparse = response.into_inner().into_read_results()?;
let results_by_key: std::collections::HashMap<bytes::Bytes, _> =
sparse.into_iter().filter_map(|opt| opt.map(|r| (r.key.clone(), r))).collect();
Ok(keys_for_alignment.iter().map(|k| results_by_key.get(k).cloned()).collect())
}
Err(status) => {
error!("Read request failed: {:?}", status);
Err(status.into())
}
}
}
async fn make_leader_client(
&self
) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
let client_inner = self.client_inner.load();
let channel = client_inner.pool.get_leader();
let mut client = RaftClientServiceClient::new(channel);
if client_inner.pool.config.enable_compression {
client = client
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip);
}
Ok(client)
}
pub(super) async fn make_client(
&self
) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
let client_inner = self.client_inner.load();
let mut rng = StdRng::from_os_rng();
let channels = client_inner.pool.get_all_channels();
let i = rng.random_range(0..channels.len());
let mut client = RaftClientServiceClient::new(channels[i].clone());
if client_inner.pool.config.enable_compression {
client = client
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip);
}
Ok(client)
}
pub async fn watch_membership(&self) -> ClientApiResult<tonic::Streaming<MembershipSnapshot>> {
let client_inner = self.client_inner.load();
let request = WatchMembershipRequest {
client_id: client_inner.client_id,
};
let mut client = self.make_client().await?;
match client.watch_membership(request).await {
Ok(response) => {
debug!("Membership watch stream established");
Ok(response.into_inner())
}
Err(status) => {
error!("watch_membership request failed: {:?}", status);
Err(status.into())
}
}
}
pub async fn watch(
&self,
key: impl AsRef<[u8]>,
) -> ClientApiResult<tonic::Streaming<WatchResponse>> {
let client_inner = self.client_inner.load();
let request = WatchRequest {
client_id: client_inner.client_id,
key: Bytes::copy_from_slice(key.as_ref()),
prefix: false,
prev_kv: false,
};
let mut client = self.make_client().await?;
match client.watch(request).await {
Ok(response) => {
debug!("Watch stream established");
Ok(response.into_inner())
}
Err(status) => {
error!("Watch request failed: {:?}", status);
Err(status.into())
}
}
}
pub async fn watch_prefix(
&self,
prefix: impl AsRef<[u8]>,
) -> ClientApiResult<tonic::Streaming<WatchResponse>> {
let client_inner = self.client_inner.load();
let request = WatchRequest {
client_id: client_inner.client_id,
key: Bytes::copy_from_slice(prefix.as_ref()),
prefix: true,
prev_kv: false,
};
let mut client = self.make_client().await?;
match client.watch(request).await {
Ok(response) => {
debug!("Prefix watch stream established");
Ok(response.into_inner())
}
Err(status) => {
error!("Prefix watch request failed: {:?}", status);
Err(status.into())
}
}
}
}
#[async_trait::async_trait]
impl ClientApi for GrpcClient {
async fn put(
&self,
key: impl AsRef<[u8]> + Send,
value: impl AsRef<[u8]> + Send,
) -> ClientApiResult<()> {
let _timer = ScopedTimer::new("client::put");
let client_inner = self.client_inner.load();
let command = WriteCommand::insert(
Bytes::copy_from_slice(key.as_ref()),
Bytes::copy_from_slice(value.as_ref()),
);
let request = ClientWriteRequest {
client_id: client_inner.client_id,
command: Some(command),
};
let mut client = self.make_leader_client().await?;
match client.handle_client_write(request).await {
Ok(response) => {
debug!("[:GrpcClient:write] response: {:?}", response);
let client_response = response.get_ref();
client_response.validate_error()
}
Err(status) => {
error!("[:GrpcClient:write] status: {:?}", status);
Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
}
}
}
async fn put_with_ttl(
&self,
key: impl AsRef<[u8]> + Send,
value: impl AsRef<[u8]> + Send,
ttl_secs: u64,
) -> ClientApiResult<()> {
let _timer = ScopedTimer::new("client::put_with_ttl");
let client_inner = self.client_inner.load();
let command = WriteCommand::insert_with_ttl(
Bytes::copy_from_slice(key.as_ref()),
Bytes::copy_from_slice(value.as_ref()),
ttl_secs,
);
let request = ClientWriteRequest {
client_id: client_inner.client_id,
command: Some(command),
};
let mut client = self.make_leader_client().await?;
match client.handle_client_write(request).await {
Ok(response) => {
debug!("[:GrpcClient:put_with_ttl] response: {:?}", response);
let client_response = response.get_ref();
client_response.validate_error()
}
Err(status) => {
error!("[:GrpcClient:put_with_ttl] status: {:?}", status);
Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
}
}
}
async fn get(
&self,
key: impl AsRef<[u8]> + Send,
) -> ClientApiResult<Option<Bytes>> {
let result = self.get_with_policy(key, None).await;
match result {
Ok(Some(client_result)) => Ok(Some(client_result.value)),
Ok(None) => Ok(None),
Err(e) => Err(Into::<ClientApiError>::into(e)),
}
}
async fn get_multi(
&self,
keys: &[Bytes],
) -> ClientApiResult<Vec<Option<Bytes>>> {
let result = self.get_multi_with_policy(keys.iter().cloned(), None).await;
match result {
Ok(results) => {
Ok(results.into_iter().map(|opt| opt.map(|r| r.value)).collect())
}
Err(e) => Err(Into::<ClientApiError>::into(e)),
}
}
async fn delete(
&self,
key: impl AsRef<[u8]> + Send,
) -> ClientApiResult<()> {
let client_inner = self.client_inner.load();
let command = WriteCommand::delete(Bytes::copy_from_slice(key.as_ref()));
let request = ClientWriteRequest {
client_id: client_inner.client_id,
command: Some(command),
};
let mut client = self.make_leader_client().await?;
match client.handle_client_write(request).await {
Ok(response) => {
debug!("[:GrpcClient:delete] response: {:?}", response);
let client_response = response.get_ref();
client_response.validate_error()
}
Err(status) => {
error!("[:GrpcClient:delete] status: {:?}", status);
Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
}
}
}
async fn compare_and_swap(
&self,
key: impl AsRef<[u8]> + Send,
expected_value: Option<impl AsRef<[u8]> + Send>,
new_value: impl AsRef<[u8]> + Send,
) -> ClientApiResult<bool> {
let client_inner = self.client_inner.load();
let expected = expected_value.map(|v| Bytes::copy_from_slice(v.as_ref()));
let command = WriteCommand::compare_and_swap(
Bytes::copy_from_slice(key.as_ref()),
expected,
Bytes::copy_from_slice(new_value.as_ref()),
);
let request = ClientWriteRequest {
client_id: client_inner.client_id,
command: Some(command),
};
let mut client = self.make_leader_client().await?;
match client.handle_client_write(request).await {
Ok(response) => {
debug!("[:GrpcClient:compare_and_swap] response: {:?}", response);
let client_response = response.get_ref();
client_response.validate_error()?;
Ok(client_response.is_write_success())
}
Err(status) => {
error!("[:GrpcClient:compare_and_swap] status: {:?}", status);
Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
}
}
}
async fn list_members(
&self
) -> ClientApiResult<Vec<d_engine_proto::server::cluster::NodeMeta>> {
let client_inner = self.client_inner.load();
Ok(client_inner.pool.get_all_members())
}
async fn get_leader_id(&self) -> ClientApiResult<Option<u32>> {
let client_inner = self.client_inner.load();
Ok(client_inner.pool.get_leader_id())
}
async fn get_multi_with_policy(
&self,
keys: &[Bytes],
consistency_policy: Option<ReadConsistencyPolicy>,
) -> ClientApiResult<Vec<Option<Bytes>>> {
let result =
<Self>::get_multi_with_policy(self, keys.iter().cloned(), consistency_policy).await;
match result {
Ok(results) => Ok(results.into_iter().map(|opt| opt.map(|r| r.value)).collect()),
Err(e) => Err(e),
}
}
async fn get_linearizable(
&self,
key: impl AsRef<[u8]> + Send,
) -> ClientApiResult<Option<Bytes>> {
let result = self.get_with_policy(key, Some(ReadConsistencyPolicy::LinearizableRead)).await;
match result {
Ok(Some(client_result)) => Ok(Some(client_result.value)),
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
async fn get_lease(
&self,
key: impl AsRef<[u8]> + Send,
) -> ClientApiResult<Option<Bytes>> {
let result = self.get_with_policy(key, Some(ReadConsistencyPolicy::LeaseRead)).await;
match result {
Ok(Some(client_result)) => Ok(Some(client_result.value)),
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
async fn get_eventual(
&self,
key: impl AsRef<[u8]> + Send,
) -> ClientApiResult<Option<Bytes>> {
let result = self
.get_with_policy(key, Some(ReadConsistencyPolicy::EventualConsistency))
.await;
match result {
Ok(Some(client_result)) => Ok(Some(client_result.value)),
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
async fn scan_prefix(
&self,
prefix: impl AsRef<[u8]> + Send,
) -> ClientApiResult<ScanResult> {
let client_inner = self.client_inner.load();
let mut client = self.make_leader_client().await?;
let request = ScanRequest {
client_id: client_inner.client_id,
prefix: Bytes::copy_from_slice(prefix.as_ref()),
};
let response = client
.handle_client_scan(request)
.await
.map_err(ClientApiError::from)?
.into_inner();
Ok(ScanResult {
entries: response.entries.into_iter().map(|e| (e.key, e.value)).collect(),
revision: response.revision,
})
}
}