use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use derive_new::new;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use super::Request;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::Result;
use crate::SecurityManager;
#[async_trait]
pub trait KvConnect: Sized + Send + Sync + 'static {
type KvClient: KvClient + Clone + Send + Sync + 'static;
async fn connect(&self, address: &str) -> Result<Self::KvClient>;
}
#[derive(new, Clone)]
pub struct TikvConnect {
security_mgr: Arc<SecurityManager>,
timeout: Duration,
grpc_max_decoding_message_size: usize,
}
#[async_trait]
impl KvConnect for TikvConnect {
type KvClient = KvRpcClient;
async fn connect(&self, address: &str) -> Result<KvRpcClient> {
self.security_mgr
.connect(address, move |channel| {
TikvClient::new(channel)
.max_decoding_message_size(self.grpc_max_decoding_message_size)
.accept_compressed(CompressionEncoding::Gzip)
})
.await
.map(|c| KvRpcClient::new(c, self.timeout))
}
}
#[async_trait]
pub trait KvClient {
async fn dispatch(&self, req: &dyn Request) -> Result<Box<dyn Any>>;
}
#[derive(new, Clone)]
pub struct KvRpcClient {
rpc_client: TikvClient<Channel>,
timeout: Duration,
}
#[async_trait]
impl KvClient for KvRpcClient {
async fn dispatch(&self, request: &dyn Request) -> Result<Box<dyn Any>> {
request.dispatch(&self.rpc_client, self.timeout).await
}
}