1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use bytes::Bytes;
5use d_engine_proto::client::ClientReadRequest;
6use d_engine_proto::client::ClientResult;
7use d_engine_proto::client::ClientWriteRequest;
8use d_engine_proto::client::ReadConsistencyPolicy;
9use d_engine_proto::client::WatchRequest;
10use d_engine_proto::client::WatchResponse;
11use d_engine_proto::client::WriteCommand;
12use d_engine_proto::client::raft_client_service_client::RaftClientServiceClient;
13use d_engine_proto::error::ErrorCode;
14use rand::Rng;
15use rand::SeedableRng;
16use rand::rngs::StdRng;
17use tonic::codec::CompressionEncoding;
18use tonic::transport::Channel;
19use tracing::debug;
20use tracing::error;
21use tracing::warn;
22
23use super::ClientInner;
24use crate::ClientApiError;
25use crate::ClientResponseExt;
26use crate::KvClient as CoreKvClient;
27use crate::KvResult;
28use crate::scoped_timer::ScopedTimer;
29
30#[derive(Clone)]
35pub struct GrpcKvClient {
36 pub(super) client_inner: Arc<ArcSwap<ClientInner>>,
37}
38
39impl GrpcKvClient {
40 pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
41 Self { client_inner }
42 }
43
44 pub async fn put(
51 &self,
52 key: impl AsRef<[u8]>,
53 value: impl AsRef<[u8]>,
54 ) -> std::result::Result<(), ClientApiError> {
55 let _timer = ScopedTimer::new("client::put");
56
57 let client_inner = self.client_inner.load();
58
59 let mut commands = Vec::new();
61 let client_command_insert = WriteCommand::insert(
62 Bytes::copy_from_slice(key.as_ref()),
63 Bytes::copy_from_slice(value.as_ref()),
64 );
65 commands.push(client_command_insert);
66
67 let request = ClientWriteRequest {
68 client_id: client_inner.client_id,
69 commands,
70 };
71
72 let mut client = self.make_leader_client().await?;
73 match client.handle_client_write(request).await {
75 Ok(response) => {
76 debug!("[:KvClient:write] response: {:?}", response);
77 let client_response = response.get_ref();
78 client_response.validate_error()
79 }
80 Err(status) => {
81 error!("[:KvClient:write] status: {:?}", status);
82 Err(status.into())
83 }
84 }
85 }
86
87 pub async fn put_with_ttl(
101 &self,
102 key: impl AsRef<[u8]>,
103 value: impl AsRef<[u8]>,
104 ttl_secs: u64,
105 ) -> std::result::Result<(), ClientApiError> {
106 let _timer = ScopedTimer::new("client::put_with_ttl");
107
108 let client_inner = self.client_inner.load();
109
110 let mut commands = Vec::new();
112 let client_command_insert = WriteCommand::insert_with_ttl(
113 Bytes::copy_from_slice(key.as_ref()),
114 Bytes::copy_from_slice(value.as_ref()),
115 ttl_secs,
116 );
117 commands.push(client_command_insert);
118
119 let request = ClientWriteRequest {
120 client_id: client_inner.client_id,
121 commands,
122 };
123
124 let mut client = self.make_leader_client().await?;
125 match client.handle_client_write(request).await {
127 Ok(response) => {
128 debug!("[:KvClient:put_with_ttl] response: {:?}", response);
129 let client_response = response.get_ref();
130 client_response.validate_error()
131 }
132 Err(status) => {
133 error!("[:KvClient:put_with_ttl] status: {:?}", status);
134 Err(status.into())
135 }
136 }
137 }
138
139 pub async fn delete(
152 &self,
153 key: impl AsRef<[u8]>,
154 ) -> std::result::Result<(), ClientApiError> {
155 let client_inner = self.client_inner.load();
156 let mut commands = Vec::new();
158 let client_command_delete = WriteCommand::delete(Bytes::copy_from_slice(key.as_ref()));
159 commands.push(client_command_delete);
160
161 let request = ClientWriteRequest {
162 client_id: client_inner.client_id,
163 commands,
164 };
165
166 let mut client = self.make_leader_client().await?;
167
168 match client.handle_client_write(request).await {
170 Ok(response) => {
171 debug!("[:KvClient:delete] response: {:?}", response);
172 let client_response = response.get_ref();
173 client_response.validate_error()
174 }
175 Err(status) => {
176 error!("[:KvClient:delete] status: {:?}", status);
177 Err(status.into())
178 }
179 }
180 }
181
182 pub async fn get_linearizable(
184 &self,
185 key: impl AsRef<[u8]>,
186 ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
187 self.get_with_policy(key, Some(ReadConsistencyPolicy::LinearizableRead)).await
188 }
189
190 pub async fn get_lease(
191 &self,
192 key: impl AsRef<[u8]>,
193 ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
194 self.get_with_policy(key, Some(ReadConsistencyPolicy::LeaseRead)).await
195 }
196
197 pub async fn get_eventual(
198 &self,
199 key: impl AsRef<[u8]>,
200 ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
201 self.get_with_policy(key, Some(ReadConsistencyPolicy::EventualConsistency))
202 .await
203 }
204
205 pub async fn get(
218 &self,
219 key: impl AsRef<[u8]>,
220 ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
221 self.get_with_policy(key, None).await
222 }
223
224 pub async fn get_with_policy(
233 &self,
234 key: impl AsRef<[u8]>,
235 consistency_policy: Option<ReadConsistencyPolicy>,
236 ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
237 let mut results =
239 self.get_multi_with_policy(std::iter::once(key), consistency_policy).await?;
240
241 Ok(results.pop().unwrap_or(None))
243 }
244
245 pub async fn get_multi(
250 &self,
251 keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
252 ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
253 self.get_multi_with_policy(keys, None).await
254 }
255
256 pub async fn get_multi_with_policy(
261 &self,
262 keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
263 consistency_policy: Option<ReadConsistencyPolicy>,
264 ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
265 let _timer = ScopedTimer::new("client::get_multi");
266
267 let client_inner = self.client_inner.load();
268 let keys: Vec<Bytes> =
270 keys.into_iter().map(|k| Bytes::copy_from_slice(k.as_ref())).collect();
271
272 if keys.is_empty() {
274 warn!("Attempted multi-get with empty key collection");
275 return Err(ErrorCode::InvalidRequest.into());
276 }
277
278 let request = ClientReadRequest {
280 client_id: client_inner.client_id,
281 keys,
282 consistency_policy: consistency_policy.map(|p| p as i32),
283 };
284
285 let mut client = match consistency_policy {
287 Some(ReadConsistencyPolicy::LinearizableRead)
288 | Some(ReadConsistencyPolicy::LeaseRead) => {
289 debug!("Using leader client for explicit consistency policy");
290 self.make_leader_client().await?
291 }
292 Some(ReadConsistencyPolicy::EventualConsistency) | None => {
293 debug!("Using load-balanced client for cluster default policy");
294 self.make_client().await?
295 }
296 };
297
298 match client.handle_client_read(request).await {
300 Ok(response) => {
301 debug!("Read response: {:?}", response);
302 response.into_inner().into_read_results()
303 }
304 Err(status) => {
305 error!("Read request failed: {:?}", status);
306 Err(status.into())
307 }
308 }
309 }
310
311 pub async fn watch(
336 &self,
337 key: impl AsRef<[u8]>,
338 ) -> std::result::Result<tonic::Streaming<WatchResponse>, ClientApiError> {
339 let client_inner = self.client_inner.load();
340
341 let request = WatchRequest {
342 client_id: client_inner.client_id,
343 key: Bytes::copy_from_slice(key.as_ref()),
344 };
345
346 let mut client = self.make_client().await?;
348
349 match client.watch(request).await {
350 Ok(response) => {
351 debug!("Watch stream established");
352 Ok(response.into_inner())
353 }
354 Err(status) => {
355 error!("Watch request failed: {:?}", status);
356 Err(status.into())
357 }
358 }
359 }
360
361 async fn make_leader_client(
362 &self
363 ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
364 let client_inner = self.client_inner.load();
365
366 let channel = client_inner.pool.get_leader();
367 let mut client = RaftClientServiceClient::new(channel);
368 if client_inner.pool.config.enable_compression {
369 client = client
370 .send_compressed(CompressionEncoding::Gzip)
371 .accept_compressed(CompressionEncoding::Gzip);
372 }
373
374 Ok(client)
375 }
376
377 pub(super) async fn make_client(
378 &self
379 ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
380 let client_inner = self.client_inner.load();
381
382 let mut rng = StdRng::from_entropy();
384 let channels = client_inner.pool.get_all_channels();
385 let i = rng.gen_range(0..channels.len());
386
387 let mut client = RaftClientServiceClient::new(channels[i].clone());
388
389 if client_inner.pool.config.enable_compression {
390 client = client
391 .send_compressed(CompressionEncoding::Gzip)
392 .accept_compressed(CompressionEncoding::Gzip);
393 }
394
395 Ok(client)
396 }
397}
398
399#[async_trait::async_trait]
403impl CoreKvClient for GrpcKvClient {
404 async fn put(
405 &self,
406 key: impl AsRef<[u8]> + Send,
407 value: impl AsRef<[u8]> + Send,
408 ) -> KvResult<()> {
409 GrpcKvClient::put(self, key, value).await.map_err(Into::into)
410 }
411
412 async fn put_with_ttl(
413 &self,
414 key: impl AsRef<[u8]> + Send,
415 value: impl AsRef<[u8]> + Send,
416 ttl_secs: u64,
417 ) -> KvResult<()> {
418 GrpcKvClient::put_with_ttl(self, key, value, ttl_secs).await.map_err(Into::into)
419 }
420
421 async fn get(
422 &self,
423 key: impl AsRef<[u8]> + Send,
424 ) -> KvResult<Option<Bytes>> {
425 match GrpcKvClient::get(self, key).await {
426 Ok(Some(result)) => Ok(Some(result.value)),
427 Ok(None) => Ok(None),
428 Err(e) => Err(e.into()),
429 }
430 }
431
432 async fn get_multi(
433 &self,
434 keys: &[Bytes],
435 ) -> KvResult<Vec<Option<Bytes>>> {
436 match GrpcKvClient::get_multi(self, keys.iter().cloned()).await {
437 Ok(results) => Ok(results.into_iter().map(|opt| opt.map(|r| r.value)).collect()),
438 Err(e) => Err(e.into()),
439 }
440 }
441
442 async fn delete(
443 &self,
444 key: impl AsRef<[u8]> + Send,
445 ) -> KvResult<()> {
446 GrpcKvClient::delete(self, key).await.map_err(Into::into)
447 }
448}