1use log::debug;
2use log::error;
3use rand::rngs::StdRng;
4use rand::Rng;
5use rand::SeedableRng;
6use tonic::codec::CompressionEncoding;
7use tonic::transport::Channel;
8
9use super::ConnectionPool;
10use crate::proto::rpc_service_client::RpcServiceClient;
11use crate::proto::ClientCommand;
12use crate::proto::ClientProposeRequest;
13use crate::proto::ClientReadRequest;
14use crate::proto::ClientRequestError;
15use crate::proto::ClientResponse;
16use crate::proto::ClientResult;
17use crate::Error;
18use crate::Result;
19
20pub struct KvClient {
25 client_id: u32,
26 pool: ConnectionPool,
27}
28
29impl KvClient {
30 pub(crate) fn new(
31 client_id: u32,
32 pool: ConnectionPool,
33 ) -> Self {
34 Self { client_id, pool }
35 }
36
37 pub async fn put(
43 &self,
44 key: impl AsRef<[u8]>,
45 value: impl AsRef<[u8]>,
46 ) -> Result<()> {
47 let mut commands = Vec::new();
49 let client_command_insert = ClientCommand::insert(key, value);
50 commands.push(client_command_insert);
51
52 let request = ClientProposeRequest {
53 client_id: self.client_id,
54 commands,
55 };
56
57 let mut client = self.make_leader_client().await?;
58 match client.handle_client_propose(request).await {
60 Ok(response) => {
61 debug!("[:KvClient:write] response: {:?}", response);
62 match response.get_ref() {
63 ClientResponse { error_code, result: _ } => {
64 if matches!(
65 ClientRequestError::try_from(*error_code).unwrap_or(ClientRequestError::NoError),
66 ClientRequestError::NoError
67 ) {
68 return Ok(());
69 } else {
70 error!("handle_client_propose error_code:{:?}", error_code);
71 }
72 }
73 }
74 }
75 Err(status) => {
76 error!("[:KvClient:write] status: {:?}", status);
77 }
78 }
79 Err(Error::FailedToSendWriteRequestError)
80 }
81
82 pub async fn delete(
94 &self,
95 key: impl AsRef<[u8]>,
96 ) -> Result<()> {
97 let mut commands = Vec::new();
99 let client_command_insert = ClientCommand::delete(key);
100 commands.push(client_command_insert);
101
102 let request = ClientProposeRequest {
103 client_id: self.client_id,
104 commands,
105 };
106
107 let mut client = self.make_leader_client().await?;
108
109 match client.handle_client_propose(request).await {
111 Ok(response) => {
112 debug!("[:KvClient:delete] response: {:?}", response);
113
114 return Ok(());
115 }
116 Err(status) => {
117 error!("[:KvClient:delete] status: {:?}", status);
118 }
119 }
120 Err(Error::FailedToSendWriteRequestError)
121 }
122
123 pub async fn get(
134 &self,
135 key: impl AsRef<[u8]>,
136 linear: bool,
137 ) -> Result<Option<ClientResult>> {
138 let mut results = self.get_multi(std::iter::once(key), linear).await?;
140
141 results.pop().ok_or_else(|| {
143 error!("Internal error: empty results from single-key read");
144 Error::InvalidResponse
145 })
146 }
147 pub async fn get_multi(
160 &self,
161 keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
162 linear: bool,
163 ) -> Result<Vec<Option<ClientResult>>> {
164 let commands: Vec<ClientCommand> = keys.into_iter().map(|k| ClientCommand::get(k.as_ref())).collect();
166
167 if commands.is_empty() {
169 return Err(Error::EmptyKeys);
170 }
171
172 let mut client = if linear {
174 self.make_leader_client().await?
175 } else {
176 self.make_client().await?
177 };
178
179 let request = ClientReadRequest {
181 client_id: self.client_id,
182 linear,
183 commands,
184 };
185
186 match client.handle_client_read(request).await {
188 Ok(response) => {
189 debug!("Read response: {:?}", response);
190 response.into_inner().into_read_results()
191 }
192 Err(status) => {
193 error!("Read request failed: {:?}", status);
194 Err(Error::FailedToSendReadRequestError)
195 }
196 }
197 }
198
199 async fn make_leader_client(&self) -> Result<RpcServiceClient<Channel>> {
200 let channel = self.pool.get_leader();
201 let mut client = RpcServiceClient::new(channel);
202 if self.pool.config.enable_compression {
203 client = client
204 .send_compressed(CompressionEncoding::Gzip)
205 .accept_compressed(CompressionEncoding::Gzip);
206 }
207
208 Ok(client)
209 }
210
211 async fn make_client(&self) -> Result<RpcServiceClient<Channel>> {
212 let mut rng = StdRng::from_entropy();
214 let channels = self.pool.get_all_channels();
215 let i = rng.gen_range(0..channels.len());
216
217 let mut client = RpcServiceClient::new(channels[i].clone());
218
219 if self.pool.config.enable_compression {
220 client = client
221 .send_compressed(CompressionEncoding::Gzip)
222 .accept_compressed(CompressionEncoding::Gzip);
223 }
224
225 Ok(client)
226 }
227}