d_engine_client/
grpc_kv_client.rs

1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use bytes::Bytes;
5use d_engine_proto::client::ClientReadRequest;
6use d_engine_proto::client::ClientResult;
7use d_engine_proto::client::ClientWriteRequest;
8use d_engine_proto::client::ReadConsistencyPolicy;
9use d_engine_proto::client::WatchRequest;
10use d_engine_proto::client::WatchResponse;
11use d_engine_proto::client::WriteCommand;
12use d_engine_proto::client::raft_client_service_client::RaftClientServiceClient;
13use d_engine_proto::error::ErrorCode;
14use rand::Rng;
15use rand::SeedableRng;
16use rand::rngs::StdRng;
17use tonic::codec::CompressionEncoding;
18use tonic::transport::Channel;
19use tracing::debug;
20use tracing::error;
21use tracing::warn;
22
23use super::ClientInner;
24use crate::ClientApiError;
25use crate::ClientResponseExt;
26use crate::KvClient as CoreKvClient;
27use crate::KvResult;
28use crate::scoped_timer::ScopedTimer;
29
30/// gRPC-based key-value store client
31///
32/// Implements remote CRUD operations via gRPC protocol.
33/// All write operations use strong consistency.
34#[derive(Clone)]
35pub struct GrpcKvClient {
36    pub(super) client_inner: Arc<ArcSwap<ClientInner>>,
37}
38
39impl GrpcKvClient {
40    pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
41        Self { client_inner }
42    }
43
44    /// Stores a value with strong consistency
45    ///
46    /// # Errors
47    /// - [`crate::ClientApiError::Network`] on network failures
48    /// - [`crate::ClientApiError::Protocol`] for protocol errors
49    /// - [`crate::ClientApiError::Storage`] for server-side storage errors
50    pub async fn put(
51        &self,
52        key: impl AsRef<[u8]>,
53        value: impl AsRef<[u8]>,
54    ) -> std::result::Result<(), ClientApiError> {
55        let _timer = ScopedTimer::new("client::put");
56
57        let client_inner = self.client_inner.load();
58
59        // Build request
60        let mut commands = Vec::new();
61        let client_command_insert = WriteCommand::insert(
62            Bytes::copy_from_slice(key.as_ref()),
63            Bytes::copy_from_slice(value.as_ref()),
64        );
65        commands.push(client_command_insert);
66
67        let request = ClientWriteRequest {
68            client_id: client_inner.client_id,
69            commands,
70        };
71
72        let mut client = self.make_leader_client().await?;
73        // Send write request
74        match client.handle_client_write(request).await {
75            Ok(response) => {
76                debug!("[:KvClient:write] response: {:?}", response);
77                let client_response = response.get_ref();
78                client_response.validate_error()
79            }
80            Err(status) => {
81                error!("[:KvClient:write] status: {:?}", status);
82                Err(status.into())
83            }
84        }
85    }
86
87    /// Stores a value with TTL (time-to-live) and strong consistency
88    ///
89    /// Key will automatically expire and be deleted after ttl_secs seconds.
90    ///
91    /// # Arguments
92    /// * `key` - The key to store
93    /// * `value` - The value to store
94    /// * `ttl_secs` - Time-to-live in seconds
95    ///
96    /// # Errors
97    /// - [`crate::ClientApiError::Network`] on network failures
98    /// - [`crate::ClientApiError::Protocol`] for protocol errors
99    /// - [`crate::ClientApiError::Storage`] for server-side storage errors
100    pub async fn put_with_ttl(
101        &self,
102        key: impl AsRef<[u8]>,
103        value: impl AsRef<[u8]>,
104        ttl_secs: u64,
105    ) -> std::result::Result<(), ClientApiError> {
106        let _timer = ScopedTimer::new("client::put_with_ttl");
107
108        let client_inner = self.client_inner.load();
109
110        // Build request with TTL
111        let mut commands = Vec::new();
112        let client_command_insert = WriteCommand::insert_with_ttl(
113            Bytes::copy_from_slice(key.as_ref()),
114            Bytes::copy_from_slice(value.as_ref()),
115            ttl_secs,
116        );
117        commands.push(client_command_insert);
118
119        let request = ClientWriteRequest {
120            client_id: client_inner.client_id,
121            commands,
122        };
123
124        let mut client = self.make_leader_client().await?;
125        // Send write request
126        match client.handle_client_write(request).await {
127            Ok(response) => {
128                debug!("[:KvClient:put_with_ttl] response: {:?}", response);
129                let client_response = response.get_ref();
130                client_response.validate_error()
131            }
132            Err(status) => {
133                error!("[:KvClient:put_with_ttl] status: {:?}", status);
134                Err(status.into())
135            }
136        }
137    }
138
139    /// Deletes a key with strong consistency guarantees
140    ///
141    /// Permanently removes the specified key and its associated value from the store.
142    ///
143    /// # Parameters
144    /// - `key`: The byte-serialized key to delete. Supports any type implementing `AsRef<[u8]>`
145    ///   (e.g. `String`, `&str`, `Vec<u8>`)
146    ///
147    /// # Errors
148    /// - [`crate::ClientApiError::Network`] if unable to reach the leader node
149    /// - [`crate::ClientApiError::Protocol`] for protocol errors
150    /// - [`crate::ClientApiError::Storage`] for server-side storage errors
151    pub async fn delete(
152        &self,
153        key: impl AsRef<[u8]>,
154    ) -> std::result::Result<(), ClientApiError> {
155        let client_inner = self.client_inner.load();
156        // Build request
157        let mut commands = Vec::new();
158        let client_command_delete = WriteCommand::delete(Bytes::copy_from_slice(key.as_ref()));
159        commands.push(client_command_delete);
160
161        let request = ClientWriteRequest {
162            client_id: client_inner.client_id,
163            commands,
164        };
165
166        let mut client = self.make_leader_client().await?;
167
168        // Send delete request
169        match client.handle_client_write(request).await {
170            Ok(response) => {
171                debug!("[:KvClient:delete] response: {:?}", response);
172                let client_response = response.get_ref();
173                client_response.validate_error()
174            }
175            Err(status) => {
176                error!("[:KvClient:delete] status: {:?}", status);
177                Err(status.into())
178            }
179        }
180    }
181
182    // Convenience methods for explicit consistency levels
183    pub async fn get_linearizable(
184        &self,
185        key: impl AsRef<[u8]>,
186    ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
187        self.get_with_policy(key, Some(ReadConsistencyPolicy::LinearizableRead)).await
188    }
189
190    pub async fn get_lease(
191        &self,
192        key: impl AsRef<[u8]>,
193    ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
194        self.get_with_policy(key, Some(ReadConsistencyPolicy::LeaseRead)).await
195    }
196
197    pub async fn get_eventual(
198        &self,
199        key: impl AsRef<[u8]>,
200    ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
201        self.get_with_policy(key, Some(ReadConsistencyPolicy::EventualConsistency))
202            .await
203    }
204
205    /// Retrieves a single key's value using server's default consistency policy
206    ///
207    /// Uses the cluster's configured default consistency policy as defined in
208    /// the server's ReadConsistencyConfig.default_policy setting.
209    ///
210    /// # Parameters
211    /// * `key` - The key to retrieve, accepts any type implementing `AsRef<[u8]>`
212    ///
213    /// # Returns
214    /// * `Ok(Some(ClientResult))` - Key exists, returns key-value pair
215    /// * `Ok(None)` - Key does not exist in the store
216    /// * `Err(ClientApiError)` - Read failed due to network or consistency issues
217    pub async fn get(
218        &self,
219        key: impl AsRef<[u8]>,
220    ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
221        self.get_with_policy(key, None).await
222    }
223
224    /// Retrieves a single key's value with explicit consistency policy
225    ///
226    /// Allows client to override server's default consistency policy for this specific request.
227    /// If server's allow_client_override is false, the override will be ignored.
228    ///
229    /// # Parameters
230    /// * `key` - The key to retrieve, accepts any type implementing `AsRef<[u8]>`
231    /// * `policy` - Explicit consistency policy for this request
232    pub async fn get_with_policy(
233        &self,
234        key: impl AsRef<[u8]>,
235        consistency_policy: Option<ReadConsistencyPolicy>,
236    ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
237        // Delegate to multi-get implementation
238        let mut results =
239            self.get_multi_with_policy(std::iter::once(key), consistency_policy).await?;
240
241        // Extract single result (safe due to single-key input)
242        Ok(results.pop().unwrap_or(None))
243    }
244
245    /// Fetches multiple keys using server's default consistency policy
246    ///
247    /// Uses the cluster's configured default consistency policy as defined in
248    /// the server's ReadConsistencyConfig.default_policy setting.
249    pub async fn get_multi(
250        &self,
251        keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
252    ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
253        self.get_multi_with_policy(keys, None).await
254    }
255
256    /// Fetches multiple keys with explicit consistency policy override
257    ///
258    /// Allows client to override server's default consistency policy for this batch request.
259    /// If server's allow_client_override is false, the override will be ignored.
260    pub async fn get_multi_with_policy(
261        &self,
262        keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
263        consistency_policy: Option<ReadConsistencyPolicy>,
264    ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
265        let _timer = ScopedTimer::new("client::get_multi");
266
267        let client_inner = self.client_inner.load();
268        // Convert keys to commands
269        let keys: Vec<Bytes> =
270            keys.into_iter().map(|k| Bytes::copy_from_slice(k.as_ref())).collect();
271
272        // Validate at least one key
273        if keys.is_empty() {
274            warn!("Attempted multi-get with empty key collection");
275            return Err(ErrorCode::InvalidRequest.into());
276        }
277
278        // Build request
279        let request = ClientReadRequest {
280            client_id: client_inner.client_id,
281            keys,
282            consistency_policy: consistency_policy.map(|p| p as i32),
283        };
284
285        // Select client based on policy (if specified)
286        let mut client = match consistency_policy {
287            Some(ReadConsistencyPolicy::LinearizableRead)
288            | Some(ReadConsistencyPolicy::LeaseRead) => {
289                debug!("Using leader client for explicit consistency policy");
290                self.make_leader_client().await?
291            }
292            Some(ReadConsistencyPolicy::EventualConsistency) | None => {
293                debug!("Using load-balanced client for cluster default policy");
294                self.make_client().await?
295            }
296        };
297
298        // Execute request
299        match client.handle_client_read(request).await {
300            Ok(response) => {
301                debug!("Read response: {:?}", response);
302                response.into_inner().into_read_results()
303            }
304            Err(status) => {
305                error!("Read request failed: {:?}", status);
306                Err(status.into())
307            }
308        }
309    }
310
311    /// Watch for changes on a specific key
312    ///
313    /// Returns a stream of watch events whenever the specified key is modified (PUT or DELETE).
314    /// The stream will continue until the client drops the receiver or disconnects.
315    ///
316    /// # Arguments
317    /// * `key` - The exact key to watch (prefix/range watch not supported in v1)
318    ///
319    /// # Returns
320    /// * `Ok(Streaming<WatchResponse>)` - Stream of watch events
321    /// * `Err(ClientApiError)` - If watch feature is disabled or connection fails
322    ///
323    /// # Example
324    /// ```rust,ignore
325    /// use futures::StreamExt;
326    ///
327    /// let mut stream = client.kv().watch("my_key").await?;
328    /// while let Some(event) = stream.next().await {
329    ///     match event {
330    ///         Ok(response) => println!("Key changed: {:?}", response),
331    ///         Err(e) => eprintln!("Watch error: {:?}", e),
332    ///     }
333    /// }
334    /// ```
335    pub async fn watch(
336        &self,
337        key: impl AsRef<[u8]>,
338    ) -> std::result::Result<tonic::Streaming<WatchResponse>, ClientApiError> {
339        let client_inner = self.client_inner.load();
340
341        let request = WatchRequest {
342            client_id: client_inner.client_id,
343            key: Bytes::copy_from_slice(key.as_ref()),
344        };
345
346        // Watch can connect to any node (leader or follower)
347        let mut client = self.make_client().await?;
348
349        match client.watch(request).await {
350            Ok(response) => {
351                debug!("Watch stream established");
352                Ok(response.into_inner())
353            }
354            Err(status) => {
355                error!("Watch request failed: {:?}", status);
356                Err(status.into())
357            }
358        }
359    }
360
361    async fn make_leader_client(
362        &self
363    ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
364        let client_inner = self.client_inner.load();
365
366        let channel = client_inner.pool.get_leader();
367        let mut client = RaftClientServiceClient::new(channel);
368        if client_inner.pool.config.enable_compression {
369            client = client
370                .send_compressed(CompressionEncoding::Gzip)
371                .accept_compressed(CompressionEncoding::Gzip);
372        }
373
374        Ok(client)
375    }
376
377    pub(super) async fn make_client(
378        &self
379    ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
380        let client_inner = self.client_inner.load();
381
382        // Balance from read clients
383        let mut rng = StdRng::from_entropy();
384        let channels = client_inner.pool.get_all_channels();
385        let i = rng.gen_range(0..channels.len());
386
387        let mut client = RaftClientServiceClient::new(channels[i].clone());
388
389        if client_inner.pool.config.enable_compression {
390            client = client
391                .send_compressed(CompressionEncoding::Gzip)
392                .accept_compressed(CompressionEncoding::Gzip);
393        }
394
395        Ok(client)
396    }
397}
398
399// ==================== Core KvClient Trait Implementation ====================
400
401// Implement d_engine_core::KvClient trait for GrpcKvClient
402#[async_trait::async_trait]
403impl CoreKvClient for GrpcKvClient {
404    async fn put(
405        &self,
406        key: impl AsRef<[u8]> + Send,
407        value: impl AsRef<[u8]> + Send,
408    ) -> KvResult<()> {
409        GrpcKvClient::put(self, key, value).await.map_err(Into::into)
410    }
411
412    async fn put_with_ttl(
413        &self,
414        key: impl AsRef<[u8]> + Send,
415        value: impl AsRef<[u8]> + Send,
416        ttl_secs: u64,
417    ) -> KvResult<()> {
418        GrpcKvClient::put_with_ttl(self, key, value, ttl_secs).await.map_err(Into::into)
419    }
420
421    async fn get(
422        &self,
423        key: impl AsRef<[u8]> + Send,
424    ) -> KvResult<Option<Bytes>> {
425        match GrpcKvClient::get(self, key).await {
426            Ok(Some(result)) => Ok(Some(result.value)),
427            Ok(None) => Ok(None),
428            Err(e) => Err(e.into()),
429        }
430    }
431
432    async fn get_multi(
433        &self,
434        keys: &[Bytes],
435    ) -> KvResult<Vec<Option<Bytes>>> {
436        match GrpcKvClient::get_multi(self, keys.iter().cloned()).await {
437            Ok(results) => Ok(results.into_iter().map(|opt| opt.map(|r| r.value)).collect()),
438            Err(e) => Err(e.into()),
439        }
440    }
441
442    async fn delete(
443        &self,
444        key: impl AsRef<[u8]> + Send,
445    ) -> KvResult<()> {
446        GrpcKvClient::delete(self, key).await.map_err(Into::into)
447    }
448}