use engula_api::{server::v1::*, v1::*};
use tonic::transport::Channel;
#[derive(Debug, Clone)]
pub struct Client {
client: node_client::NodeClient<Channel>,
}
impl Client {
pub fn new(channel: Channel) -> Self {
Client {
client: node_client::NodeClient::new(channel),
}
}
pub async fn connect(addr: String) -> Result<Self, tonic::transport::Error> {
let addr = format!("http://{}", addr);
let client = node_client::NodeClient::connect(addr).await?;
Ok(Self { client })
}
pub async fn get_root(&self) -> Result<RootDesc, tonic::Status> {
let mut client = self.client.clone();
let req = GetRootRequest::default();
let res = client.get_root(req).await?;
Ok(res.into_inner().root.unwrap_or_default())
}
pub async fn create_replica(
&self,
replica_id: u64,
group_desc: GroupDesc,
) -> Result<(), tonic::Status> {
let mut client = self.client.clone();
let req = CreateReplicaRequest {
replica_id,
group: Some(group_desc),
};
client.create_replica(req).await?;
Ok(())
}
pub async fn remove_replica(
&self,
replica_id: u64,
group: GroupDesc,
) -> Result<(), tonic::Status> {
let mut client = self.client.clone();
let req = RemoveReplicaRequest {
replica_id,
group: Some(group),
};
client.remove_replica(req).await?;
Ok(())
}
pub async fn batch_group_requests(
&self,
req: BatchRequest,
) -> Result<Vec<GroupResponse>, tonic::Status> {
let mut client = self.client.clone();
let res = client.batch(req).await?;
Ok(res.into_inner().responses)
}
pub async fn root_heartbeat(
&self,
req: HeartbeatRequest,
) -> Result<HeartbeatResponse, tonic::Status> {
let mut client = self.client.clone();
let res = client.root_heartbeat(req).await?;
Ok(res.into_inner())
}
pub async fn pull(
&self,
req: PullRequest,
) -> Result<tonic::Streaming<ShardChunk>, tonic::Status> {
let mut client = self.client.clone();
let res = client.pull(req).await?;
Ok(res.into_inner())
}
pub async fn forward(&self, req: ForwardRequest) -> Result<ForwardResponse, tonic::Status> {
let mut client = self.client.clone();
let res = client.forward(req).await?;
Ok(res.into_inner())
}
pub async fn migrate(&self, req: MigrateRequest) -> Result<MigrateResponse, tonic::Status> {
let mut client = self.client.clone();
let res = client.migrate(req).await?;
Ok(res.into_inner())
}
}
#[derive(Debug, Clone)]
pub struct RequestBatchBuilder {
node_id: u64,
requests: Vec<GroupRequest>,
}
impl RequestBatchBuilder {
pub fn new(node_id: u64) -> Self {
Self {
node_id,
requests: vec![],
}
}
pub fn get(mut self, group_id: u64, epoch: u64, shard_id: u64, key: Vec<u8>) -> Self {
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::Get(ShardGetRequest {
shard_id,
get: Some(GetRequest { key }),
})),
}),
});
self
}
pub fn put(
mut self,
group_id: u64,
epoch: u64,
shard_id: u64,
key: Vec<u8>,
value: Vec<u8>,
) -> Self {
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::Put(ShardPutRequest {
shard_id,
put: Some(PutRequest { key, value }),
})),
}),
});
self
}
pub fn delete(mut self, group_id: u64, epoch: u64, shard_id: u64, key: Vec<u8>) -> Self {
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::Delete(ShardDeleteRequest {
shard_id,
delete: Some(DeleteRequest { key }),
})),
}),
});
self
}
pub fn create_shard(mut self, group_id: u64, epoch: u64, shard_desc: ShardDesc) -> Self {
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::CreateShard(
CreateShardRequest {
shard: Some(shard_desc),
},
)),
}),
});
self
}
pub fn add_replica(mut self, group_id: u64, epoch: u64, replica_id: u64, node_id: u64) -> Self {
let change_replicas = ChangeReplicasRequest {
change_replicas: Some(ChangeReplicas {
changes: vec![ChangeReplica {
change_type: ChangeReplicaType::Add.into(),
replica_id,
node_id,
}],
}),
};
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::ChangeReplicas(
change_replicas,
)),
}),
});
self
}
pub fn add_learner(mut self, group_id: u64, epoch: u64, replica_id: u64, node_id: u64) -> Self {
let change_replicas = ChangeReplicasRequest {
change_replicas: Some(ChangeReplicas {
changes: vec![ChangeReplica {
change_type: ChangeReplicaType::AddLearner.into(),
replica_id,
node_id,
}],
}),
};
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::ChangeReplicas(
change_replicas,
)),
}),
});
self
}
pub fn remove_replica(mut self, group_id: u64, epoch: u64, replica_id: u64) -> Self {
let change_replicas = ChangeReplicasRequest {
change_replicas: Some(ChangeReplicas {
changes: vec![ChangeReplica {
change_type: ChangeReplicaType::Remove.into(),
replica_id,
..Default::default()
}],
}),
};
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::ChangeReplicas(
change_replicas,
)),
}),
});
self
}
pub fn accept_shard(
mut self,
group_id: u64,
epoch: u64,
src_group_id: u64,
src_group_epoch: u64,
shard_desc: &ShardDesc,
) -> Self {
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::AcceptShard(
AcceptShardRequest {
src_group_id,
src_group_epoch,
shard_desc: Some(shard_desc.to_owned()),
},
)),
}),
});
self
}
pub fn transfer_leader(mut self, group_id: u64, epoch: u64, transferee: u64) -> Self {
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::Transfer(TransferRequest {
transferee,
})),
}),
});
self
}
pub fn shard_prefix(mut self, group_id: u64, epoch: u64, shard_id: u64, prefix: &[u8]) -> Self {
self.requests.push(GroupRequest {
group_id,
epoch,
request: Some(GroupRequestUnion {
request: Some(group_request_union::Request::PrefixList(
ShardPrefixListRequest {
shard_id,
prefix: prefix.to_owned(),
},
)),
}),
});
self
}
pub fn build(self) -> BatchRequest {
BatchRequest {
node_id: self.node_id,
requests: self.requests,
}
}
}