use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use derive_new::new;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::pd::RetryClient;
use crate::proto::keyspacepb;
use crate::proto::metapb::RegionEpoch;
use crate::proto::metapb::{self};
use crate::region::RegionId;
use crate::region::RegionWithLeader;
use crate::store::KvConnect;
use crate::store::RegionStore;
use crate::store::Request;
use crate::store::{KvClient, Store};
use crate::Config;
use crate::Error;
use crate::Key;
use crate::Result;
use crate::Timestamp;
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
PdRpcClient::new(
config.clone(),
|_| MockKvConnect,
|sm| {
futures::future::ok(RetryClient::new_with_cluster(
sm,
config.timeout,
MockCluster,
))
},
false,
)
.await
.unwrap()
}
#[allow(clippy::type_complexity)]
#[derive(new, Default, Clone)]
pub struct MockKvClient {
pub addr: String,
dispatch: Option<Arc<dyn Fn(&dyn Any) -> Result<Box<dyn Any>> + Send + Sync + 'static>>,
}
impl MockKvClient {
pub fn with_dispatch_hook<F>(dispatch: F) -> MockKvClient
where
F: Fn(&dyn Any) -> Result<Box<dyn Any>> + Send + Sync + 'static,
{
MockKvClient {
addr: String::new(),
dispatch: Some(Arc::new(dispatch)),
}
}
}
pub struct MockKvConnect;
pub struct MockCluster;
#[derive(new)]
pub struct MockPdClient {
client: MockKvClient,
}
#[async_trait]
impl KvClient for MockKvClient {
async fn dispatch(&self, req: &dyn Request) -> Result<Box<dyn Any>> {
match &self.dispatch {
Some(f) => f(req.as_any()),
None => panic!("no dispatch hook set"),
}
}
}
#[async_trait]
impl KvConnect for MockKvConnect {
type KvClient = MockKvClient;
async fn connect(&self, address: &str) -> Result<Self::KvClient> {
Ok(MockKvClient {
addr: address.to_owned(),
dispatch: None,
})
}
}
impl MockPdClient {
pub fn default() -> MockPdClient {
MockPdClient {
client: MockKvClient::default(),
}
}
pub fn region1() -> RegionWithLeader {
let mut region = RegionWithLeader::default();
region.region.id = 1;
region.region.start_key = vec![];
region.region.end_key = vec![10];
region.region.region_epoch = Some(RegionEpoch {
conf_ver: 0,
version: 0,
});
let leader = metapb::Peer {
store_id: 41,
..Default::default()
};
region.leader = Some(leader);
region
}
pub fn region2() -> RegionWithLeader {
let mut region = RegionWithLeader::default();
region.region.id = 2;
region.region.start_key = vec![10];
region.region.end_key = vec![250, 250];
region.region.region_epoch = Some(RegionEpoch {
conf_ver: 0,
version: 0,
});
let leader = metapb::Peer {
store_id: 42,
..Default::default()
};
region.leader = Some(leader);
region
}
pub fn region3() -> RegionWithLeader {
let mut region = RegionWithLeader::default();
region.region.id = 3;
region.region.start_key = vec![250, 250];
region.region.end_key = vec![];
region.region.region_epoch = Some(RegionEpoch {
conf_ver: 0,
version: 0,
});
let leader = metapb::Peer {
store_id: 43,
..Default::default()
};
region.leader = Some(leader);
region
}
}
#[async_trait]
impl PdClient for MockPdClient {
type KvClient = MockKvClient;
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Ok(RegionStore::new(region, Arc::new(self.client.clone())))
}
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
let bytes: &[_] = key.into();
let region = if bytes.is_empty() || bytes < &[10][..] {
Self::region1()
} else if bytes >= &[10][..] && bytes < &[250, 250][..] {
Self::region2()
} else {
Self::region3()
};
Ok(region)
}
async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
match id {
1 => Ok(Self::region1()),
2 => Ok(Self::region2()),
3 => Ok(Self::region3()),
_ => Err(Error::RegionNotFoundInResponse { region_id: id }),
}
}
async fn all_stores(&self) -> Result<Vec<Store>> {
Ok(vec![Store::new(Arc::new(self.client.clone()))])
}
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
Ok(Timestamp::default())
}
async fn update_safepoint(self: Arc<Self>, _safepoint: u64) -> Result<bool> {
unimplemented!()
}
async fn update_leader(
&self,
_ver_id: crate::region::RegionVerId,
_leader: metapb::Peer,
) -> Result<()> {
todo!()
}
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
async fn invalidate_store_cache(&self, _store_id: crate::region::StoreId) {}
async fn load_keyspace(&self, _keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
unimplemented!()
}
}