d_engine/client/mod.rs
1//! Client module for distributed consensus system
2//!
3//! Provides core components for interacting with the d_engine cluster:
4//! - [`Client`] - Main entry point with cluster access
5//! - [`ClientBuilder`] - Configurable client construction
6//! - [`KvClient`] - Key-value store operations
7//! - [`ClusterClient`] - Cluster management operations
8//! - [`ConnectionPool`] - Underlying connection management
9//!
10//! # Basic Usage
11//! ```no_run
12//! use d_engine::client::{Client, ClientBuilder};
13//! use std::time::Duration;
14//! use core::error::Error;
15//!
16//! #[tokio::main(flavor = "current_thread")]
17//! async fn main(){
18//! // Initialize client with automatic cluster discovery
19//! let client = Client::builder(vec![
20//! "http://node1:9081".into(),
21//! "http://node2:9082".into()
22//! ])
23//! .connect_timeout(Duration::from_secs(3))
24//! .request_timeout(Duration::from_secs(1))
25//! .enable_compression(true)
26//! .build()
27//! .await
28//! .unwrap();
29//!
30//! // Execute key-value operations
31//! client.kv().put("user:1001", "Alice").await.unwrap();
32//!
33//! let value = client.kv().get("user:1001").await.unwrap();
34//!
35//! println!("User data: {:?}", value);
36//!
37//! // Perform cluster management
38//! let members = client.cluster().list_members().await.unwrap();
39//! println!("Cluster members: {:?}", members);
40//!
41//! }
42//! ```
43
44mod builder;
45mod cluster;
46mod config;
47mod error;
48mod kv;
49mod pool;
50
51pub use builder::*;
52use bytes::Bytes;
53pub use cluster::*;
54pub use config::*;
55pub use error::*;
56pub use kv::*;
57pub use pool::*;
58
59#[cfg(test)]
60mod cluster_test;
61#[cfg(test)]
62mod kv_test;
63#[cfg(test)]
64mod pool_test;
65
66use std::sync::Arc;
67
68use arc_swap::ArcSwap;
69use tracing::error;
70
71use crate::proto::client::client_response::SuccessResult;
72use crate::proto::client::write_command;
73use crate::proto::client::ClientResponse;
74use crate::proto::client::ClientResult;
75use crate::proto::client::ReadResults;
76use crate::proto::client::WriteCommand;
77use crate::proto::error::ErrorCode;
78
79/// Main entry point for interacting with the d_engine cluster
80///
81/// Manages connections and provides access to specialized clients:
82/// - Use [`kv()`](Client::kv) for data operations
83/// - Use [`cluster()`](Client::cluster) for cluster administration
84///
85/// Created through the [`builder()`](Client::builder) method
86#[derive(Clone)]
87pub struct Client {
88 /// Key-value store client interface
89 pub(super) kv: KvClient,
90
91 /// Cluster management client interface
92 pub(super) cluster: ClusterClient,
93
94 pub(super) inner: Arc<ArcSwap<ClientInner>>,
95}
96
97#[derive(Clone)]
98pub struct ClientInner {
99 pub(super) pool: ConnectionPool,
100 pub(super) client_id: u32,
101 pub(super) config: ClientConfig,
102 pub(super) endpoints: Vec<String>,
103}
104
105impl Client {
106 /// Access the key-value operations client
107 ///
108 /// # Examples
109 /// ```rust,ignore
110 /// client.kv().put("key", "value").await?;
111 /// ```
112 pub fn kv(&self) -> &KvClient {
113 &self.kv
114 }
115
116 /// Access the cluster management client
117 ///
118 /// # Examples
119 /// ```rust,ignore
120 /// client.cluster().add_node("node3:9083").await?;
121 /// ```
122 pub fn cluster(&self) -> &ClusterClient {
123 &self.cluster
124 }
125
126 /// Create a configured client builder
127 ///
128 /// Starts client construction process with specified bootstrap endpoints.
129 /// Chain configuration methods before calling
130 /// [`build()`](ClientBuilder::build).
131 ///
132 /// # Arguments
133 /// * `endpoints` - Initial cluster nodes for discovery
134 ///
135 /// # Panics
136 /// Will panic if no valid endpoints provided
137 pub fn builder(endpoints: Vec<String>) -> ClientBuilder {
138 assert!(!endpoints.is_empty(), "At least one endpoint required");
139 ClientBuilder::new(endpoints)
140 }
141
142 pub async fn refresh(
143 &mut self,
144 new_endpoints: Option<Vec<String>>,
145 ) -> std::result::Result<(), ClientApiError> {
146 // Get a writable lock
147 let old_inner = self.inner.load();
148 let config = old_inner.config.clone();
149 let endpoints = new_endpoints.unwrap_or(old_inner.endpoints.clone());
150
151 let new_pool = ConnectionPool::create(endpoints.clone(), config.clone()).await?;
152
153 let new_inner = Arc::new(ClientInner {
154 pool: new_pool,
155 client_id: old_inner.client_id,
156 config,
157 endpoints,
158 });
159
160 self.inner.store(new_inner);
161 Ok(())
162 }
163}
164
165impl WriteCommand {
166 /// Create write command for key-value pair
167 ///
168 /// # Parameters
169 /// - `key`: Byte array for storage key
170 /// - `value`: Byte array to be stored
171 pub fn insert(
172 key: impl Into<Bytes>,
173 value: impl Into<Bytes>,
174 ) -> Self {
175 let cmd = write_command::Insert {
176 key: key.into(),
177 value: value.into(),
178 };
179 Self {
180 operation: Some(write_command::Operation::Insert(cmd)),
181 }
182 }
183
184 /// Create deletion command for specified key
185 ///
186 /// # Parameters
187 /// - `key`: Byte array of key to delete
188 pub fn delete(key: impl Into<Bytes>) -> Self {
189 let cmd = write_command::Delete { key: key.into() };
190 Self {
191 operation: Some(write_command::Operation::Delete(cmd)),
192 }
193 }
194}
195impl ClientResponse {
196 /// Build success response for write operations
197 ///
198 /// # Returns
199 /// Response with NoError code and write confirmation
200 pub fn write_success() -> Self {
201 Self {
202 error: ErrorCode::Success as i32,
203 success_result: Some(SuccessResult::WriteAck(true)),
204 metadata: None,
205 }
206 }
207
208 /// Check if the write operation was successful
209 ///
210 /// # Returns
211 /// - `true` if the response indicates a successful write operation
212 /// - `false` if the response indicates a failed write operation or is not a write response
213 pub fn is_write_success(&self) -> bool {
214 self.error == ErrorCode::Success as i32
215 && matches!(self.success_result, Some(SuccessResult::WriteAck(true)))
216 }
217
218 /// Build success response for read operations
219 ///
220 /// # Parameters
221 /// - `results`: Vector of retrieved key-value pairs
222 pub fn read_results(results: Vec<ClientResult>) -> Self {
223 Self {
224 error: ErrorCode::Success as i32,
225 success_result: Some(SuccessResult::ReadData(ReadResults { results })),
226 metadata: None,
227 }
228 }
229
230 /// Build generic error response for any operation type
231 ///
232 /// # Parameters
233 /// - `error_code`: Predefined client request error code
234 pub fn client_error(error_code: ErrorCode) -> Self {
235 Self {
236 error: error_code as i32,
237 success_result: None,
238 metadata: None,
239 }
240 }
241
242 /// Convert response to boolean write result
243 ///
244 /// # Returns
245 /// - `Ok(true)` on successful write
246 /// - `Err` with converted error code on failure
247 pub fn into_write_result(&self) -> std::result::Result<bool, ClientApiError> {
248 self.validate_error()?;
249 Ok(match self.success_result {
250 Some(SuccessResult::WriteAck(success)) => success,
251 _ => false,
252 })
253 }
254
255 /// Convert response to read results
256 ///
257 /// # Returns
258 /// Vector of optional key-value pairs wrapped in Result
259 pub fn into_read_results(
260 &self
261 ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
262 self.validate_error()?;
263 match &self.success_result {
264 Some(SuccessResult::ReadData(data)) => data
265 .results
266 .clone()
267 .into_iter()
268 .map(|item| {
269 Ok(Some(ClientResult {
270 key: item.key,
271 value: item.value,
272 }))
273 })
274 .collect(),
275 _ => {
276 error!("Invalid response type for read operation");
277 unreachable!()
278 }
279 }
280 }
281
282 /// Validate error code in response header
283 ///
284 /// # Internal Logic
285 /// Converts numeric error code to enum variant
286 pub(crate) fn validate_error(&self) -> std::result::Result<(), ClientApiError> {
287 match ErrorCode::try_from(self.error).unwrap_or(ErrorCode::Uncategorized) {
288 ErrorCode::Success => Ok(()),
289 e => Err(e.into()),
290 }
291 }
292
293 /// Check if this response indicates the leader's term is outdated
294 pub fn is_term_outdated(&self) -> bool {
295 ErrorCode::try_from(self.error).map(|e| e.is_term_outdated()).unwrap_or(false)
296 }
297
298 /// Check if this response indicates a quorum timeout or failure to receive majority responses
299 pub fn is_quorum_timeout_or_failure(&self) -> bool {
300 ErrorCode::try_from(self.error)
301 .map(|e| e.is_quorum_timeout_or_failure())
302 .unwrap_or(false)
303 }
304
305 /// Check if this response indicates a failure to receive majority responses
306 pub fn is_propose_failure(&self) -> bool {
307 ErrorCode::try_from(self.error).map(|e| e.is_propose_failure()).unwrap_or(false)
308 }
309
310 /// Check if this response indicates a a retry required
311 pub fn is_retry_required(&self) -> bool {
312 ErrorCode::try_from(self.error).map(|e| e.is_retry_required()).unwrap_or(false)
313 }
314}
315
316impl ErrorCode {
317 /// Check if this error indicates the leader's term is outdated
318 pub(crate) fn is_term_outdated(&self) -> bool {
319 matches!(self, ErrorCode::TermOutdated)
320 }
321
322 /// Check if this error indicates a quorum timeout or failure to receive majority responses
323 pub(crate) fn is_quorum_timeout_or_failure(&self) -> bool {
324 matches!(
325 self,
326 ErrorCode::ConnectionTimeout | ErrorCode::ProposeFailed | ErrorCode::ClusterUnavailable
327 )
328 }
329
330 /// Check if this error indicates a failure to receive majority responses
331 pub(crate) fn is_propose_failure(&self) -> bool {
332 matches!(self, ErrorCode::ProposeFailed)
333 }
334
335 /// Check if this error indicates a retry required
336 pub(crate) fn is_retry_required(&self) -> bool {
337 matches!(self, ErrorCode::RetryRequired)
338 }
339}