1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use bytes::Bytes;
5use d_engine_core::ScanResult;
6use d_engine_core::client::ErrorCode;
7use d_engine_core::client::KvEntry;
8use d_engine_core::config::ReadConsistencyPolicy;
9use d_engine_proto::client::ClientReadRequest;
10use d_engine_proto::client::ClientWriteRequest;
11use d_engine_proto::client::MembershipSnapshot;
12use d_engine_proto::client::ScanRequest;
13use d_engine_proto::client::WatchMembershipRequest;
14use d_engine_proto::client::WatchRequest;
15use d_engine_proto::client::WatchResponse;
16use d_engine_proto::client::WriteCommand;
17use d_engine_proto::client::raft_client_service_client::RaftClientServiceClient;
18use rand::Rng;
19use rand::SeedableRng;
20use rand::rngs::StdRng;
21use tonic::codec::CompressionEncoding;
22use tonic::transport::Channel;
23use tracing::debug;
24use tracing::error;
25use tracing::warn;
26
27use super::ClientInner;
28use crate::ClientApiError;
29use crate::ClientResponseExt;
30use crate::scoped_timer::ScopedTimer;
31use d_engine_core::client::{ClientApi, ClientApiResult};
32
33#[derive(Clone)]
38pub struct GrpcClient {
39 pub(super) client_inner: Arc<ArcSwap<ClientInner>>,
40}
41
42impl GrpcClient {
43 pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
44 Self { client_inner }
45 }
46
47 pub async fn get_with_policy(
56 &self,
57 key: impl AsRef<[u8]>,
58 consistency_policy: Option<ReadConsistencyPolicy>,
59 ) -> std::result::Result<Option<KvEntry>, ClientApiError> {
60 let mut results =
62 self.get_multi_with_policy(std::iter::once(key), consistency_policy).await?;
63
64 Ok(results.pop().unwrap_or(None))
66 }
67
68 pub async fn get_multi_with_policy(
73 &self,
74 keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
75 consistency_policy: Option<ReadConsistencyPolicy>,
76 ) -> std::result::Result<Vec<Option<KvEntry>>, ClientApiError> {
77 let _timer = ScopedTimer::new("client::get_multi");
78
79 let client_inner = self.client_inner.load();
80 let keys: Vec<Bytes> =
82 keys.into_iter().map(|k| Bytes::copy_from_slice(k.as_ref())).collect();
83
84 if keys.is_empty() {
86 warn!("Attempted multi-get with empty key collection");
87 return Err(ErrorCode::InvalidRequest.into());
88 }
89
90 let keys_for_alignment = keys.clone();
92 let request = ClientReadRequest {
93 client_id: client_inner.client_id,
94 keys,
95 consistency_policy: consistency_policy
96 .clone()
97 .map(|p| d_engine_proto::client::ReadConsistencyPolicy::from(p) as i32),
98 };
99
100 let mut client = match consistency_policy {
104 Some(ReadConsistencyPolicy::LinearizableRead)
105 | Some(ReadConsistencyPolicy::LeaseRead)
106 | None => {
107 debug!("Using leader client for explicit consistency policy");
108 self.make_leader_client().await?
109 }
110 Some(ReadConsistencyPolicy::EventualConsistency) => {
111 debug!("Using load-balanced client for cluster default policy");
112 self.make_client().await?
113 }
114 };
115
116 match client.handle_client_read(request).await {
118 Ok(response) => {
119 debug!("Read response: {:?}", response);
120 let sparse = response.into_inner().into_read_results()?;
125 let results_by_key: std::collections::HashMap<bytes::Bytes, _> =
126 sparse.into_iter().filter_map(|opt| opt.map(|r| (r.key.clone(), r))).collect();
127 Ok(keys_for_alignment.iter().map(|k| results_by_key.get(k).cloned()).collect())
128 }
129 Err(status) => {
130 error!("Read request failed: {:?}", status);
131 Err(status.into())
132 }
133 }
134 }
135
136 async fn make_leader_client(
137 &self
138 ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
139 let client_inner = self.client_inner.load();
140
141 let channel = client_inner.pool.get_leader();
142 let mut client = RaftClientServiceClient::new(channel);
143 if client_inner.pool.config.enable_compression {
144 client = client
145 .send_compressed(CompressionEncoding::Gzip)
146 .accept_compressed(CompressionEncoding::Gzip);
147 }
148
149 Ok(client)
150 }
151
152 pub(super) async fn make_client(
153 &self
154 ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
155 let client_inner = self.client_inner.load();
156
157 let mut rng = StdRng::from_os_rng();
159 let channels = client_inner.pool.get_all_channels();
160 let i = rng.random_range(0..channels.len());
161
162 let mut client = RaftClientServiceClient::new(channels[i].clone());
163
164 if client_inner.pool.config.enable_compression {
165 client = client
166 .send_compressed(CompressionEncoding::Gzip)
167 .accept_compressed(CompressionEncoding::Gzip);
168 }
169
170 Ok(client)
171 }
172
173 pub async fn watch_membership(&self) -> ClientApiResult<tonic::Streaming<MembershipSnapshot>> {
182 let client_inner = self.client_inner.load();
183
184 let request = WatchMembershipRequest {
185 client_id: client_inner.client_id,
186 };
187
188 let mut client = self.make_client().await?;
190
191 match client.watch_membership(request).await {
192 Ok(response) => {
193 debug!("Membership watch stream established");
194 Ok(response.into_inner())
195 }
196 Err(status) => {
197 error!("watch_membership request failed: {:?}", status);
198 Err(status.into())
199 }
200 }
201 }
202
203 pub async fn watch(
220 &self,
221 key: impl AsRef<[u8]>,
222 ) -> ClientApiResult<tonic::Streaming<WatchResponse>> {
223 let client_inner = self.client_inner.load();
224
225 let request = WatchRequest {
226 client_id: client_inner.client_id,
227 key: Bytes::copy_from_slice(key.as_ref()),
228 prefix: false,
229 prev_kv: false,
230 };
231
232 let mut client = self.make_client().await?;
234
235 match client.watch(request).await {
236 Ok(response) => {
237 debug!("Watch stream established");
238 Ok(response.into_inner())
239 }
240 Err(status) => {
241 error!("Watch request failed: {:?}", status);
242 Err(status.into())
243 }
244 }
245 }
246
247 pub async fn watch_prefix(
252 &self,
253 prefix: impl AsRef<[u8]>,
254 ) -> ClientApiResult<tonic::Streaming<WatchResponse>> {
255 let client_inner = self.client_inner.load();
256
257 let request = WatchRequest {
258 client_id: client_inner.client_id,
259 key: Bytes::copy_from_slice(prefix.as_ref()),
260 prefix: true,
261 prev_kv: false,
262 };
263
264 let mut client = self.make_client().await?;
265
266 match client.watch(request).await {
267 Ok(response) => {
268 debug!("Prefix watch stream established");
269 Ok(response.into_inner())
270 }
271 Err(status) => {
272 error!("Prefix watch request failed: {:?}", status);
273 Err(status.into())
274 }
275 }
276 }
277}
278
279#[async_trait::async_trait]
283impl ClientApi for GrpcClient {
284 async fn put(
285 &self,
286 key: impl AsRef<[u8]> + Send,
287 value: impl AsRef<[u8]> + Send,
288 ) -> ClientApiResult<()> {
289 let _timer = ScopedTimer::new("client::put");
291
292 let client_inner = self.client_inner.load();
293
294 let command = WriteCommand::insert(
296 Bytes::copy_from_slice(key.as_ref()),
297 Bytes::copy_from_slice(value.as_ref()),
298 );
299
300 let request = ClientWriteRequest {
301 client_id: client_inner.client_id,
302 command: Some(command),
303 };
304
305 let mut client = self.make_leader_client().await?;
307 match client.handle_client_write(request).await {
308 Ok(response) => {
309 debug!("[:GrpcClient:write] response: {:?}", response);
310 let client_response = response.get_ref();
311 client_response.validate_error()
312 }
313 Err(status) => {
314 error!("[:GrpcClient:write] status: {:?}", status);
315 Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
316 }
317 }
318 }
319
320 async fn put_with_ttl(
321 &self,
322 key: impl AsRef<[u8]> + Send,
323 value: impl AsRef<[u8]> + Send,
324 ttl_secs: u64,
325 ) -> ClientApiResult<()> {
326 let _timer = ScopedTimer::new("client::put_with_ttl");
328
329 let client_inner = self.client_inner.load();
330
331 let command = WriteCommand::insert_with_ttl(
333 Bytes::copy_from_slice(key.as_ref()),
334 Bytes::copy_from_slice(value.as_ref()),
335 ttl_secs,
336 );
337
338 let request = ClientWriteRequest {
339 client_id: client_inner.client_id,
340 command: Some(command),
341 };
342
343 let mut client = self.make_leader_client().await?;
345 match client.handle_client_write(request).await {
346 Ok(response) => {
347 debug!("[:GrpcClient:put_with_ttl] response: {:?}", response);
348 let client_response = response.get_ref();
349 client_response.validate_error()
350 }
351 Err(status) => {
352 error!("[:GrpcClient:put_with_ttl] status: {:?}", status);
353 Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
354 }
355 }
356 }
357
358 async fn get(
359 &self,
360 key: impl AsRef<[u8]> + Send,
361 ) -> ClientApiResult<Option<Bytes>> {
362 let result = self.get_with_policy(key, None).await;
364
365 match result {
366 Ok(Some(client_result)) => Ok(Some(client_result.value)),
367 Ok(None) => Ok(None),
368 Err(e) => Err(Into::<ClientApiError>::into(e)),
369 }
370 }
371
372 async fn get_multi(
373 &self,
374 keys: &[Bytes],
375 ) -> ClientApiResult<Vec<Option<Bytes>>> {
376 let result = self.get_multi_with_policy(keys.iter().cloned(), None).await;
378
379 match result {
380 Ok(results) => {
381 Ok(results.into_iter().map(|opt| opt.map(|r| r.value)).collect())
383 }
384 Err(e) => Err(Into::<ClientApiError>::into(e)),
385 }
386 }
387
388 async fn delete(
389 &self,
390 key: impl AsRef<[u8]> + Send,
391 ) -> ClientApiResult<()> {
392 let client_inner = self.client_inner.load();
393
394 let command = WriteCommand::delete(Bytes::copy_from_slice(key.as_ref()));
396
397 let request = ClientWriteRequest {
398 client_id: client_inner.client_id,
399 command: Some(command),
400 };
401
402 let mut client = self.make_leader_client().await?;
404 match client.handle_client_write(request).await {
405 Ok(response) => {
406 debug!("[:GrpcClient:delete] response: {:?}", response);
407 let client_response = response.get_ref();
408 client_response.validate_error()
409 }
410 Err(status) => {
411 error!("[:GrpcClient:delete] status: {:?}", status);
412 Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
413 }
414 }
415 }
416
417 async fn compare_and_swap(
418 &self,
419 key: impl AsRef<[u8]> + Send,
420 expected_value: Option<impl AsRef<[u8]> + Send>,
421 new_value: impl AsRef<[u8]> + Send,
422 ) -> ClientApiResult<bool> {
423 let client_inner = self.client_inner.load();
424
425 let expected = expected_value.map(|v| Bytes::copy_from_slice(v.as_ref()));
427 let command = WriteCommand::compare_and_swap(
428 Bytes::copy_from_slice(key.as_ref()),
429 expected,
430 Bytes::copy_from_slice(new_value.as_ref()),
431 );
432
433 let request = ClientWriteRequest {
434 client_id: client_inner.client_id,
435 command: Some(command),
436 };
437
438 let mut client = self.make_leader_client().await?;
440 match client.handle_client_write(request).await {
441 Ok(response) => {
442 debug!("[:GrpcClient:compare_and_swap] response: {:?}", response);
443 let client_response = response.get_ref();
444
445 client_response.validate_error()?;
447
448 Ok(client_response.is_write_success())
450 }
451 Err(status) => {
452 error!("[:GrpcClient:compare_and_swap] status: {:?}", status);
453 Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
454 }
455 }
456 }
457
458 async fn list_members(
459 &self
460 ) -> ClientApiResult<Vec<d_engine_proto::server::cluster::NodeMeta>> {
461 let client_inner = self.client_inner.load();
462 Ok(client_inner.pool.get_all_members())
463 }
464
465 async fn get_leader_id(&self) -> ClientApiResult<Option<u32>> {
466 let client_inner = self.client_inner.load();
467 Ok(client_inner.pool.get_leader_id())
468 }
469
470 async fn get_multi_with_policy(
471 &self,
472 keys: &[Bytes],
473 consistency_policy: Option<ReadConsistencyPolicy>,
474 ) -> ClientApiResult<Vec<Option<Bytes>>> {
475 let result =
477 <Self>::get_multi_with_policy(self, keys.iter().cloned(), consistency_policy).await;
478
479 match result {
480 Ok(results) => Ok(results.into_iter().map(|opt| opt.map(|r| r.value)).collect()),
481 Err(e) => Err(e),
482 }
483 }
484
485 async fn get_linearizable(
486 &self,
487 key: impl AsRef<[u8]> + Send,
488 ) -> ClientApiResult<Option<Bytes>> {
489 let result = self.get_with_policy(key, Some(ReadConsistencyPolicy::LinearizableRead)).await;
490
491 match result {
492 Ok(Some(client_result)) => Ok(Some(client_result.value)),
493 Ok(None) => Ok(None),
494 Err(e) => Err(e),
495 }
496 }
497
498 async fn get_lease(
499 &self,
500 key: impl AsRef<[u8]> + Send,
501 ) -> ClientApiResult<Option<Bytes>> {
502 let result = self.get_with_policy(key, Some(ReadConsistencyPolicy::LeaseRead)).await;
503
504 match result {
505 Ok(Some(client_result)) => Ok(Some(client_result.value)),
506 Ok(None) => Ok(None),
507 Err(e) => Err(e),
508 }
509 }
510
511 async fn get_eventual(
512 &self,
513 key: impl AsRef<[u8]> + Send,
514 ) -> ClientApiResult<Option<Bytes>> {
515 let result = self
516 .get_with_policy(key, Some(ReadConsistencyPolicy::EventualConsistency))
517 .await;
518
519 match result {
520 Ok(Some(client_result)) => Ok(Some(client_result.value)),
521 Ok(None) => Ok(None),
522 Err(e) => Err(e),
523 }
524 }
525
526 async fn scan_prefix(
527 &self,
528 prefix: impl AsRef<[u8]> + Send,
529 ) -> ClientApiResult<ScanResult> {
530 let client_inner = self.client_inner.load();
531 let mut client = self.make_leader_client().await?;
532
533 let request = ScanRequest {
534 client_id: client_inner.client_id,
535 prefix: Bytes::copy_from_slice(prefix.as_ref()),
536 };
537
538 let response = client
539 .handle_client_scan(request)
540 .await
541 .map_err(ClientApiError::from)?
542 .into_inner();
543
544 Ok(ScanResult {
545 entries: response.entries.into_iter().map(|e| (e.key, e.value)).collect(),
546 revision: response.revision,
547 })
548 }
549}