Skip to main content

d_engine_client/
grpc_client.rs

1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use bytes::Bytes;
5use d_engine_core::ScanResult;
6use d_engine_core::client::ErrorCode;
7use d_engine_core::client::KvEntry;
8use d_engine_core::config::ReadConsistencyPolicy;
9use d_engine_proto::client::ClientReadRequest;
10use d_engine_proto::client::ClientWriteRequest;
11use d_engine_proto::client::MembershipSnapshot;
12use d_engine_proto::client::ScanRequest;
13use d_engine_proto::client::WatchMembershipRequest;
14use d_engine_proto::client::WatchRequest;
15use d_engine_proto::client::WatchResponse;
16use d_engine_proto::client::WriteCommand;
17use d_engine_proto::client::raft_client_service_client::RaftClientServiceClient;
18use rand::Rng;
19use rand::SeedableRng;
20use rand::rngs::StdRng;
21use tonic::codec::CompressionEncoding;
22use tonic::transport::Channel;
23use tracing::debug;
24use tracing::error;
25use tracing::warn;
26
27use super::ClientInner;
28use crate::ClientApiError;
29use crate::ClientResponseExt;
30use crate::scoped_timer::ScopedTimer;
31use d_engine_core::client::{ClientApi, ClientApiResult};
32
33/// gRPC-based key-value store client
34///
35/// Implements remote CRUD operations via gRPC protocol.
36/// All write operations use strong consistency.
37#[derive(Clone)]
38pub struct GrpcClient {
39    pub(super) client_inner: Arc<ArcSwap<ClientInner>>,
40}
41
42impl GrpcClient {
43    pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
44        Self { client_inner }
45    }
46
47    /// Retrieves a single key's value with explicit consistency policy
48    ///
49    /// Allows client to override server's default consistency policy for this specific request.
50    /// If server's allow_client_override is false, the override will be ignored.
51    ///
52    /// # Parameters
53    /// * `key` - The key to retrieve, accepts any type implementing `AsRef<[u8]>`
54    /// * `policy` - Explicit consistency policy for this request
55    pub async fn get_with_policy(
56        &self,
57        key: impl AsRef<[u8]>,
58        consistency_policy: Option<ReadConsistencyPolicy>,
59    ) -> std::result::Result<Option<KvEntry>, ClientApiError> {
60        // Delegate to multi-get implementation
61        let mut results =
62            self.get_multi_with_policy(std::iter::once(key), consistency_policy).await?;
63
64        // Extract single result (safe due to single-key input)
65        Ok(results.pop().unwrap_or(None))
66    }
67
68    /// Fetches multiple keys with explicit consistency policy override
69    ///
70    /// Allows client to override server's default consistency policy for this batch request.
71    /// If server's allow_client_override is false, the override will be ignored.
72    pub async fn get_multi_with_policy(
73        &self,
74        keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
75        consistency_policy: Option<ReadConsistencyPolicy>,
76    ) -> std::result::Result<Vec<Option<KvEntry>>, ClientApiError> {
77        let _timer = ScopedTimer::new("client::get_multi");
78
79        let client_inner = self.client_inner.load();
80        // Convert keys to commands
81        let keys: Vec<Bytes> =
82            keys.into_iter().map(|k| Bytes::copy_from_slice(k.as_ref())).collect();
83
84        // Validate at least one key
85        if keys.is_empty() {
86            warn!("Attempted multi-get with empty key collection");
87            return Err(ErrorCode::InvalidRequest.into());
88        }
89
90        // Build request — keep a reference for result alignment after move
91        let keys_for_alignment = keys.clone();
92        let request = ClientReadRequest {
93            client_id: client_inner.client_id,
94            keys,
95            consistency_policy: consistency_policy
96                .clone()
97                .map(|p| d_engine_proto::client::ReadConsistencyPolicy::from(p) as i32),
98        };
99
100        // Select client based on policy (if specified)
101        // None means "use server default" — server default may be Linearizable,
102        // so we must send to leader to avoid rejection from followers.
103        let mut client = match consistency_policy {
104            Some(ReadConsistencyPolicy::LinearizableRead)
105            | Some(ReadConsistencyPolicy::LeaseRead)
106            | None => {
107                debug!("Using leader client for explicit consistency policy");
108                self.make_leader_client().await?
109            }
110            Some(ReadConsistencyPolicy::EventualConsistency) => {
111                debug!("Using load-balanced client for cluster default policy");
112                self.make_client().await?
113            }
114        };
115
116        // Execute request
117        match client.handle_client_read(request).await {
118            Ok(response) => {
119                debug!("Read response: {:?}", response);
120                // Server returns only results for existing keys (sparse).
121                // Reconstruct aligned vector matching input key order,
122                // filling None for keys not present in the response.
123                // Mirrors embedded_client::get_multi_with_consistency behavior.
124                let sparse = response.into_inner().into_read_results()?;
125                let results_by_key: std::collections::HashMap<bytes::Bytes, _> =
126                    sparse.into_iter().filter_map(|opt| opt.map(|r| (r.key.clone(), r))).collect();
127                Ok(keys_for_alignment.iter().map(|k| results_by_key.get(k).cloned()).collect())
128            }
129            Err(status) => {
130                error!("Read request failed: {:?}", status);
131                Err(status.into())
132            }
133        }
134    }
135
136    async fn make_leader_client(
137        &self
138    ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
139        let client_inner = self.client_inner.load();
140
141        let channel = client_inner.pool.get_leader();
142        let mut client = RaftClientServiceClient::new(channel);
143        if client_inner.pool.config.enable_compression {
144            client = client
145                .send_compressed(CompressionEncoding::Gzip)
146                .accept_compressed(CompressionEncoding::Gzip);
147        }
148
149        Ok(client)
150    }
151
152    pub(super) async fn make_client(
153        &self
154    ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
155        let client_inner = self.client_inner.load();
156
157        // Balance from read clients
158        let mut rng = StdRng::from_os_rng();
159        let channels = client_inner.pool.get_all_channels();
160        let i = rng.random_range(0..channels.len());
161
162        let mut client = RaftClientServiceClient::new(channels[i].clone());
163
164        if client_inner.pool.config.enable_compression {
165            client = client
166                .send_compressed(CompressionEncoding::Gzip)
167                .accept_compressed(CompressionEncoding::Gzip);
168        }
169
170        Ok(client)
171    }
172
173    /// Subscribe to committed cluster membership changes.
174    ///
175    /// Immediately yields the current `MembershipSnapshot` on connect, then one
176    /// snapshot per committed ConfChange (AddNode, Promote, Remove).
177    /// The stream ends with `Err(UNAVAILABLE)` when the server shuts down;
178    /// callers should reconnect and re-subscribe.
179    ///
180    /// Use `committed_index` as an idempotency key to deduplicate retries.
181    pub async fn watch_membership(&self) -> ClientApiResult<tonic::Streaming<MembershipSnapshot>> {
182        let client_inner = self.client_inner.load();
183
184        let request = WatchMembershipRequest {
185            client_id: client_inner.client_id,
186        };
187
188        // Any node (leader or follower) emits membership changes after commit.
189        let mut client = self.make_client().await?;
190
191        match client.watch_membership(request).await {
192            Ok(response) => {
193                debug!("Membership watch stream established");
194                Ok(response.into_inner())
195            }
196            Err(status) => {
197                error!("watch_membership request failed: {:?}", status);
198                Err(status.into())
199            }
200        }
201    }
202
203    /// Watch for changes to a specific key
204    ///
205    /// Returns a stream of watch events when the key's value changes.
206    /// The stream will continue until explicitly closed or a connection error occurs.
207    ///
208    /// # Arguments
209    ///
210    /// * `key` - The key to watch
211    ///
212    /// # Returns
213    ///
214    /// A streaming response that yields `WatchResponse` events
215    ///
216    /// # Errors
217    ///
218    /// Returns error if unable to establish watch connection
219    pub async fn watch(
220        &self,
221        key: impl AsRef<[u8]>,
222    ) -> ClientApiResult<tonic::Streaming<WatchResponse>> {
223        let client_inner = self.client_inner.load();
224
225        let request = WatchRequest {
226            client_id: client_inner.client_id,
227            key: Bytes::copy_from_slice(key.as_ref()),
228            prefix: false,
229            prev_kv: false,
230        };
231
232        // Watch can connect to any node (leader or follower)
233        let mut client = self.make_client().await?;
234
235        match client.watch(request).await {
236            Ok(response) => {
237                debug!("Watch stream established");
238                Ok(response.into_inner())
239            }
240            Err(status) => {
241                error!("Watch request failed: {:?}", status);
242                Err(status.into())
243            }
244        }
245    }
246
247    /// Watch all keys under a path prefix.
248    ///
249    /// `prefix` must start with '/' and end with '/', e.g. `b"/services/"`.
250    /// Returns a stream of events for any key whose path begins with the prefix.
251    pub async fn watch_prefix(
252        &self,
253        prefix: impl AsRef<[u8]>,
254    ) -> ClientApiResult<tonic::Streaming<WatchResponse>> {
255        let client_inner = self.client_inner.load();
256
257        let request = WatchRequest {
258            client_id: client_inner.client_id,
259            key: Bytes::copy_from_slice(prefix.as_ref()),
260            prefix: true,
261            prev_kv: false,
262        };
263
264        let mut client = self.make_client().await?;
265
266        match client.watch(request).await {
267            Ok(response) => {
268                debug!("Prefix watch stream established");
269                Ok(response.into_inner())
270            }
271            Err(status) => {
272                error!("Prefix watch request failed: {:?}", status);
273                Err(status.into())
274            }
275        }
276    }
277}
278
279// ==================== Core ClientApi Trait Implementation ====================
280
281// Implement ClientApi trait for GrpcClient
282#[async_trait::async_trait]
283impl ClientApi for GrpcClient {
284    async fn put(
285        &self,
286        key: impl AsRef<[u8]> + Send,
287        value: impl AsRef<[u8]> + Send,
288    ) -> ClientApiResult<()> {
289        // Performance tracking for put operation
290        let _timer = ScopedTimer::new("client::put");
291
292        let client_inner = self.client_inner.load();
293
294        // Build write request with insert command
295        let command = WriteCommand::insert(
296            Bytes::copy_from_slice(key.as_ref()),
297            Bytes::copy_from_slice(value.as_ref()),
298        );
299
300        let request = ClientWriteRequest {
301            client_id: client_inner.client_id,
302            command: Some(command),
303        };
304
305        // Send write request to leader node (strong consistency required)
306        let mut client = self.make_leader_client().await?;
307        match client.handle_client_write(request).await {
308            Ok(response) => {
309                debug!("[:GrpcClient:write] response: {:?}", response);
310                let client_response = response.get_ref();
311                client_response.validate_error()
312            }
313            Err(status) => {
314                error!("[:GrpcClient:write] status: {:?}", status);
315                Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
316            }
317        }
318    }
319
320    async fn put_with_ttl(
321        &self,
322        key: impl AsRef<[u8]> + Send,
323        value: impl AsRef<[u8]> + Send,
324        ttl_secs: u64,
325    ) -> ClientApiResult<()> {
326        // Performance tracking for put_with_ttl operation
327        let _timer = ScopedTimer::new("client::put_with_ttl");
328
329        let client_inner = self.client_inner.load();
330
331        // Build write request with TTL-enabled insert command
332        let command = WriteCommand::insert_with_ttl(
333            Bytes::copy_from_slice(key.as_ref()),
334            Bytes::copy_from_slice(value.as_ref()),
335            ttl_secs,
336        );
337
338        let request = ClientWriteRequest {
339            client_id: client_inner.client_id,
340            command: Some(command),
341        };
342
343        // Send write request to leader node (strong consistency required)
344        let mut client = self.make_leader_client().await?;
345        match client.handle_client_write(request).await {
346            Ok(response) => {
347                debug!("[:GrpcClient:put_with_ttl] response: {:?}", response);
348                let client_response = response.get_ref();
349                client_response.validate_error()
350            }
351            Err(status) => {
352                error!("[:GrpcClient:put_with_ttl] status: {:?}", status);
353                Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
354            }
355        }
356    }
357
358    async fn get(
359        &self,
360        key: impl AsRef<[u8]> + Send,
361    ) -> ClientApiResult<Option<Bytes>> {
362        // Delegate to get_with_policy with server's default consistency policy
363        let result = self.get_with_policy(key, None).await;
364
365        match result {
366            Ok(Some(client_result)) => Ok(Some(client_result.value)),
367            Ok(None) => Ok(None),
368            Err(e) => Err(Into::<ClientApiError>::into(e)),
369        }
370    }
371
372    async fn get_multi(
373        &self,
374        keys: &[Bytes],
375    ) -> ClientApiResult<Vec<Option<Bytes>>> {
376        // Delegate to get_multi_with_policy with server's default consistency policy
377        let result = self.get_multi_with_policy(keys.iter().cloned(), None).await;
378
379        match result {
380            Ok(results) => {
381                // Extract values from ClientResult, preserving None for missing keys
382                Ok(results.into_iter().map(|opt| opt.map(|r| r.value)).collect())
383            }
384            Err(e) => Err(Into::<ClientApiError>::into(e)),
385        }
386    }
387
388    async fn delete(
389        &self,
390        key: impl AsRef<[u8]> + Send,
391    ) -> ClientApiResult<()> {
392        let client_inner = self.client_inner.load();
393
394        // Build delete request
395        let command = WriteCommand::delete(Bytes::copy_from_slice(key.as_ref()));
396
397        let request = ClientWriteRequest {
398            client_id: client_inner.client_id,
399            command: Some(command),
400        };
401
402        // Send delete request to leader node (strong consistency required)
403        let mut client = self.make_leader_client().await?;
404        match client.handle_client_write(request).await {
405            Ok(response) => {
406                debug!("[:GrpcClient:delete] response: {:?}", response);
407                let client_response = response.get_ref();
408                client_response.validate_error()
409            }
410            Err(status) => {
411                error!("[:GrpcClient:delete] status: {:?}", status);
412                Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
413            }
414        }
415    }
416
417    async fn compare_and_swap(
418        &self,
419        key: impl AsRef<[u8]> + Send,
420        expected_value: Option<impl AsRef<[u8]> + Send>,
421        new_value: impl AsRef<[u8]> + Send,
422    ) -> ClientApiResult<bool> {
423        let client_inner = self.client_inner.load();
424
425        // Build CAS request
426        let expected = expected_value.map(|v| Bytes::copy_from_slice(v.as_ref()));
427        let command = WriteCommand::compare_and_swap(
428            Bytes::copy_from_slice(key.as_ref()),
429            expected,
430            Bytes::copy_from_slice(new_value.as_ref()),
431        );
432
433        let request = ClientWriteRequest {
434            client_id: client_inner.client_id,
435            command: Some(command),
436        };
437
438        // Send CAS request to leader node
439        let mut client = self.make_leader_client().await?;
440        match client.handle_client_write(request).await {
441            Ok(response) => {
442                debug!("[:GrpcClient:compare_and_swap] response: {:?}", response);
443                let client_response = response.get_ref();
444
445                // Validate no error occurred
446                client_response.validate_error()?;
447
448                // Extract CAS result (true = succeeded, false = failed comparison)
449                Ok(client_response.is_write_success())
450            }
451            Err(status) => {
452                error!("[:GrpcClient:compare_and_swap] status: {:?}", status);
453                Err(Into::<ClientApiError>::into(ClientApiError::from(status)))
454            }
455        }
456    }
457
458    async fn list_members(
459        &self
460    ) -> ClientApiResult<Vec<d_engine_proto::server::cluster::NodeMeta>> {
461        let client_inner = self.client_inner.load();
462        Ok(client_inner.pool.get_all_members())
463    }
464
465    async fn get_leader_id(&self) -> ClientApiResult<Option<u32>> {
466        let client_inner = self.client_inner.load();
467        Ok(client_inner.pool.get_leader_id())
468    }
469
470    async fn get_multi_with_policy(
471        &self,
472        keys: &[Bytes],
473        consistency_policy: Option<ReadConsistencyPolicy>,
474    ) -> ClientApiResult<Vec<Option<Bytes>>> {
475        // Explicitly call the convenience method on impl block, not trait method
476        let result =
477            <Self>::get_multi_with_policy(self, keys.iter().cloned(), consistency_policy).await;
478
479        match result {
480            Ok(results) => Ok(results.into_iter().map(|opt| opt.map(|r| r.value)).collect()),
481            Err(e) => Err(e),
482        }
483    }
484
485    async fn get_linearizable(
486        &self,
487        key: impl AsRef<[u8]> + Send,
488    ) -> ClientApiResult<Option<Bytes>> {
489        let result = self.get_with_policy(key, Some(ReadConsistencyPolicy::LinearizableRead)).await;
490
491        match result {
492            Ok(Some(client_result)) => Ok(Some(client_result.value)),
493            Ok(None) => Ok(None),
494            Err(e) => Err(e),
495        }
496    }
497
498    async fn get_lease(
499        &self,
500        key: impl AsRef<[u8]> + Send,
501    ) -> ClientApiResult<Option<Bytes>> {
502        let result = self.get_with_policy(key, Some(ReadConsistencyPolicy::LeaseRead)).await;
503
504        match result {
505            Ok(Some(client_result)) => Ok(Some(client_result.value)),
506            Ok(None) => Ok(None),
507            Err(e) => Err(e),
508        }
509    }
510
511    async fn get_eventual(
512        &self,
513        key: impl AsRef<[u8]> + Send,
514    ) -> ClientApiResult<Option<Bytes>> {
515        let result = self
516            .get_with_policy(key, Some(ReadConsistencyPolicy::EventualConsistency))
517            .await;
518
519        match result {
520            Ok(Some(client_result)) => Ok(Some(client_result.value)),
521            Ok(None) => Ok(None),
522            Err(e) => Err(e),
523        }
524    }
525
526    async fn scan_prefix(
527        &self,
528        prefix: impl AsRef<[u8]> + Send,
529    ) -> ClientApiResult<ScanResult> {
530        let client_inner = self.client_inner.load();
531        let mut client = self.make_leader_client().await?;
532
533        let request = ScanRequest {
534            client_id: client_inner.client_id,
535            prefix: Bytes::copy_from_slice(prefix.as_ref()),
536        };
537
538        let response = client
539            .handle_client_scan(request)
540            .await
541            .map_err(ClientApiError::from)?
542            .into_inner();
543
544        Ok(ScanResult {
545            entries: response.entries.into_iter().map(|e| (e.key, e.value)).collect(),
546            revision: response.revision,
547        })
548    }
549}