d_engine/client/
kv.rs

1use log::debug;
2use log::error;
3use rand::rngs::StdRng;
4use rand::Rng;
5use rand::SeedableRng;
6use tonic::codec::CompressionEncoding;
7use tonic::transport::Channel;
8
9use super::ConnectionPool;
10use crate::proto::rpc_service_client::RpcServiceClient;
11use crate::proto::ClientCommand;
12use crate::proto::ClientProposeRequest;
13use crate::proto::ClientReadRequest;
14use crate::proto::ClientRequestError;
15use crate::proto::ClientResponse;
16use crate::proto::ClientResult;
17use crate::Error;
18use crate::Result;
19
20/// Key-value store client interface
21///
22/// Implements CRUD operations with configurable consistency levels.
23/// All write operations use strong consistency.
24pub struct KvClient {
25    client_id: u32,
26    pool: ConnectionPool,
27}
28
29impl KvClient {
30    pub(crate) fn new(
31        client_id: u32,
32        pool: ConnectionPool,
33    ) -> Self {
34        Self { client_id, pool }
35    }
36
37    // Stores a value with strong consistency
38    ///
39    /// # Errors
40    /// - [`Error::FailedToSendWriteRequestError`] on network failures
41    /// - [`Error::InvalidResponse`] for malformed server responses
42    pub async fn put(
43        &self,
44        key: impl AsRef<[u8]>,
45        value: impl AsRef<[u8]>,
46    ) -> Result<()> {
47        // Build request
48        let mut commands = Vec::new();
49        let client_command_insert = ClientCommand::insert(key, value);
50        commands.push(client_command_insert);
51
52        let request = ClientProposeRequest {
53            client_id: self.client_id,
54            commands,
55        };
56
57        let mut client = self.make_leader_client().await?;
58        // Send write request
59        match client.handle_client_propose(request).await {
60            Ok(response) => {
61                debug!("[:KvClient:write] response: {:?}", response);
62                match response.get_ref() {
63                    ClientResponse { error_code, result: _ } => {
64                        if matches!(
65                            ClientRequestError::try_from(*error_code).unwrap_or(ClientRequestError::NoError),
66                            ClientRequestError::NoError
67                        ) {
68                            return Ok(());
69                        } else {
70                            error!("handle_client_propose error_code:{:?}", error_code);
71                        }
72                    }
73                }
74            }
75            Err(status) => {
76                error!("[:KvClient:write] status: {:?}", status);
77            }
78        }
79        Err(Error::FailedToSendWriteRequestError)
80    }
81
82    /// Deletes a key with strong consistency guarantees
83    ///
84    /// Permanently removes the specified key and its associated value from the store.
85    ///
86    /// # Parameters
87    /// - `key`: The byte-serialized key to delete. Supports any type implementing `AsRef<[u8]>`
88    ///   (e.g. `String`, `&str`, `Vec<u8>`)
89    ///
90    /// # Errors
91    /// - [`Error::FailedToSendWriteRequestError`] if unable to reach the leader node
92    /// - [`Error::InvalidResponse`] for malformed server responses
93    pub async fn delete(
94        &self,
95        key: impl AsRef<[u8]>,
96    ) -> Result<()> {
97        // Build request
98        let mut commands = Vec::new();
99        let client_command_insert = ClientCommand::delete(key);
100        commands.push(client_command_insert);
101
102        let request = ClientProposeRequest {
103            client_id: self.client_id,
104            commands,
105        };
106
107        let mut client = self.make_leader_client().await?;
108
109        // Send delete request
110        match client.handle_client_propose(request).await {
111            Ok(response) => {
112                debug!("[:KvClient:delete] response: {:?}", response);
113
114                return Ok(());
115            }
116            Err(status) => {
117                error!("[:KvClient:delete] status: {:?}", status);
118            }
119        }
120        Err(Error::FailedToSendWriteRequestError)
121    }
122
123    /// Retrieves a single key's value from the cluster
124    ///
125    /// # Parameters
126    /// - `key`: The key to retrieve, accepts any byte slice compatible type
127    /// - `linear`: Whether to use linearizable read consistency
128    ///
129    /// # Returns
130    /// - `Ok(Some(ClientResult))` if key exists
131    /// - `Ok(None)` if key not found
132    /// - `Err` on network failures or invalid responses
133    pub async fn get(
134        &self,
135        key: impl AsRef<[u8]>,
136        linear: bool,
137    ) -> Result<Option<ClientResult>> {
138        // Delegate to multi-get implementation
139        let mut results = self.get_multi(std::iter::once(key), linear).await?;
140
141        // Extract single result (safe due to single-key input)
142        results.pop().ok_or_else(|| {
143            error!("Internal error: empty results from single-key read");
144            Error::InvalidResponse
145        })
146    }
147    /// Fetches values for multiple keys from the cluster
148    ///
149    /// # Parameters
150    /// - `keys`: Iterable collection of keys to fetch
151    /// - `linear`: Whether to use linearizable read consistency
152    ///
153    /// # Returns
154    /// Ordered list of results matching input keys. Missing keys return `None`.
155    ///
156    /// # Errors
157    /// - `Error::EmptyKeys` if no keys provided
158    /// - `Error::FailedToSendReadRequestError` on network failures
159    pub async fn get_multi(
160        &self,
161        keys: impl IntoIterator<Item = impl AsRef<[u8]>>,
162        linear: bool,
163    ) -> Result<Vec<Option<ClientResult>>> {
164        // Convert keys to commands
165        let commands: Vec<ClientCommand> = keys.into_iter().map(|k| ClientCommand::get(k.as_ref())).collect();
166
167        // Validate at least one key
168        if commands.is_empty() {
169            return Err(Error::EmptyKeys);
170        }
171
172        // Select client based on consistency level
173        let mut client = if linear {
174            self.make_leader_client().await?
175        } else {
176            self.make_client().await?
177        };
178
179        // Build request
180        let request = ClientReadRequest {
181            client_id: self.client_id,
182            linear,
183            commands,
184        };
185
186        // Execute request
187        match client.handle_client_read(request).await {
188            Ok(response) => {
189                debug!("Read response: {:?}", response);
190                response.into_inner().into_read_results()
191            }
192            Err(status) => {
193                error!("Read request failed: {:?}", status);
194                Err(Error::FailedToSendReadRequestError)
195            }
196        }
197    }
198
199    async fn make_leader_client(&self) -> Result<RpcServiceClient<Channel>> {
200        let channel = self.pool.get_leader();
201        let mut client = RpcServiceClient::new(channel);
202        if self.pool.config.enable_compression {
203            client = client
204                .send_compressed(CompressionEncoding::Gzip)
205                .accept_compressed(CompressionEncoding::Gzip);
206        }
207
208        Ok(client)
209    }
210
211    async fn make_client(&self) -> Result<RpcServiceClient<Channel>> {
212        // Balance from read clients
213        let mut rng = StdRng::from_entropy();
214        let channels = self.pool.get_all_channels();
215        let i = rng.gen_range(0..channels.len());
216
217        let mut client = RpcServiceClient::new(channels[i].clone());
218
219        if self.pool.config.enable_compression {
220            client = client
221                .send_compressed(CompressionEncoding::Gzip)
222                .accept_compressed(CompressionEncoding::Gzip);
223        }
224
225        Ok(client)
226    }
227}