d_engine/client/
kv.rs

1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use rand::rngs::StdRng;
5use rand::Rng;
6use rand::SeedableRng;
7use tonic::codec::CompressionEncoding;
8use tonic::transport::Channel;
9use tracing::debug;
10use tracing::error;
11
12use super::ClientInner;
13use crate::proto::client::raft_client_service_client::RaftClientServiceClient;
14use crate::proto::client::ClientReadRequest;
15use crate::proto::client::ClientResult;
16use crate::proto::client::ClientWriteRequest;
17use crate::proto::client::WriteCommand;
18use crate::proto::error::ErrorCode;
19use crate::scoped_timer::ScopedTimer;
20use crate::ClientApiError;
21
22/// Key-value store client interface
23///
24/// Implements CRUD operations with configurable consistency levels.
25/// All write operations use strong consistency.
26#[derive(Clone)]
27pub struct KvClient {
28    pub(super) client_inner: Arc<ArcSwap<ClientInner>>,
29}
30
31impl KvClient {
32    pub(crate) fn new(client_inner: Arc<ArcSwap<ClientInner>>) -> Self {
33        Self { client_inner }
34    }
35
36    /// Stores a value with strong consistency
37    ///
38    /// # Errors
39    /// - [`crate::ClientApiError::Network`] on network failures
40    /// - [`crate::ClientApiError::InvalidResponse`] for malformed server responses
41    pub async fn put(
42        &self,
43        key: impl AsRef<[u8]>,
44        value: impl AsRef<[u8]>,
45    ) -> std::result::Result<(), ClientApiError> {
46        let _timer = ScopedTimer::new("client::put");
47
48        let client_inner = self.client_inner.load();
49
50        // Build request
51        let mut commands = Vec::new();
52        let client_command_insert = WriteCommand::insert(key, value);
53        commands.push(client_command_insert);
54
55        let request = ClientWriteRequest {
56            client_id: client_inner.client_id,
57            commands,
58        };
59
60        let mut client = self.make_leader_client().await?;
61        // Send write request
62        match client.handle_client_write(request).await {
63            Ok(response) => {
64                debug!("[:KvClient:write] response: {:?}", response);
65                let client_response = response.get_ref();
66                client_response.validate_error()
67            }
68            Err(status) => {
69                error!("[:KvClient:write] status: {:?}", status);
70                Err(status.into())
71            }
72        }
73    }
74
75    /// Deletes a key with strong consistency guarantees
76    ///
77    /// Permanently removes the specified key and its associated value from the store.
78    ///
79    /// # Parameters
80    /// - `key`: The byte-serialized key to delete. Supports any type implementing `AsRef<[u8]>`
81    ///   (e.g. `String`, `&str`, `Vec<u8>`)
82    ///
83    /// # Errors
84    /// - [`Error::FailedToSendWriteRequestError`] if unable to reach the leader node
85    /// - [`Error::InvalidResponse`] for malformed server responses
86    pub async fn delete(
87        &self,
88        key: impl AsRef<[u8]>,
89    ) -> std::result::Result<(), ClientApiError> {
90        let client_inner = self.client_inner.load();
91        // Build request
92        let mut commands = Vec::new();
93        let client_command_insert = WriteCommand::delete(key);
94        commands.push(client_command_insert);
95
96        let request = ClientWriteRequest {
97            client_id: client_inner.client_id,
98            commands,
99        };
100
101        let mut client = self.make_leader_client().await?;
102
103        // Send delete request
104        match client.handle_client_write(request).await {
105            Ok(response) => {
106                debug!("[:KvClient:delete] response: {:?}", response);
107                let client_response = response.get_ref();
108                client_response.validate_error()
109            }
110            Err(status) => {
111                error!("[:KvClient:delete] status: {:?}", status);
112                Err(status.into())
113            }
114        }
115    }
116
117    /// Retrieves a single key's value from the cluster
118    ///
119    /// # Parameters
120    /// - `key`: The key to retrieve, accepts any byte slice compatible type
121    /// - `linear`: Whether to use linearizable read consistency
122    ///
123    /// # Returns
124    /// - `Ok(Some(ClientResult))` if key exists
125    /// - `Ok(None)` if key not found
126    /// - `Err` on network failures or invalid responses
127    pub async fn get(
128        &self,
129        key: impl AsRef<[u8]>,
130        linear: bool,
131    ) -> std::result::Result<Option<ClientResult>, ClientApiError> {
132        // Delegate to multi-get implementation
133        let mut results = self.get_multi(std::iter::once(key), linear).await?;
134
135        // Extract single result (safe due to single-key input)
136        Ok(results.pop().unwrap_or(None))
137    }
138    /// Fetches values for multiple keys from the cluster
139    ///
140    /// # Parameters
141    /// - `keys`: Iterable collection of keys to fetch
142    /// - `linear`: Whether to use linearizable read consistency
143    ///
144    /// # Returns
145    /// Ordered list of results matching input keys. Missing keys return `None`.
146    ///
147    /// # Errors
148    /// - `Error::EmptyKeys` if no keys provided
149    /// - `Error::FailedToSendReadRequestError` on network failures
150    pub async fn get_multi(
151        &self,
152        keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
153        linear: bool,
154    ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
155        let client_inner = self.client_inner.load();
156        // Convert keys to commands
157        let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
158
159        // Validate at least one key
160        if keys.is_empty() {
161            return Err(ErrorCode::InvalidRequest.into());
162        }
163
164        // Select client based on consistency level
165        let mut client = if linear {
166            self.make_leader_client().await?
167        } else {
168            self.make_client().await?
169        };
170
171        // Build request
172        let request = ClientReadRequest {
173            client_id: client_inner.client_id,
174            linear,
175            keys,
176        };
177
178        // Execute request
179        match client.handle_client_read(request).await {
180            Ok(response) => {
181                debug!("Read response: {:?}", response);
182                response.into_inner().into_read_results()
183            }
184            Err(status) => {
185                error!("Read request failed: {:?}", status);
186                Err(status.into())
187            }
188        }
189    }
190
191    async fn make_leader_client(
192        &self
193    ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
194        let client_inner = self.client_inner.load();
195
196        let channel = client_inner.pool.get_leader();
197        let mut client = RaftClientServiceClient::new(channel);
198        if client_inner.pool.config.enable_compression {
199            client = client
200                .send_compressed(CompressionEncoding::Gzip)
201                .accept_compressed(CompressionEncoding::Gzip);
202        }
203
204        Ok(client)
205    }
206
207    pub(super) async fn make_client(
208        &self
209    ) -> std::result::Result<RaftClientServiceClient<Channel>, ClientApiError> {
210        let client_inner = self.client_inner.load();
211
212        // Balance from read clients
213        let mut rng = StdRng::from_entropy();
214        let channels = client_inner.pool.get_all_channels();
215        let i = rng.gen_range(0..channels.len());
216
217        let mut client = RaftClientServiceClient::new(channels[i].clone());
218
219        if client_inner.pool.config.enable_compression {
220            client = client
221                .send_compressed(CompressionEncoding::Gzip)
222                .accept_compressed(CompressionEncoding::Gzip);
223        }
224
225        Ok(client)
226    }
227}