d_engine/client/mod.rs
1//! Client module for distributed consensus system
2//!
3//! Provides core components for interacting with the d_engine cluster:
4//! - [`Client`] - Main entry point with cluster access
5//! - [`ClientBuilder`] - Configurable client construction
6//! - [`KvClient`] - Key-value store operations
7//! - [`ClusterClient`] - Cluster management operations
8//! - [`ConnectionPool`] - Underlying connection management
9//!
10//! # Basic Usage
11//! ```no_run
12//! use d_engine::client::{Client, ClientBuilder};
13//! use std::time::Duration;
14//! use core::error::Error;
15//!
16//! #[tokio::main(flavor = "current_thread")]
17//! async fn main(){
18//! // Initialize client with automatic cluster discovery
19//! let client = Client::builder(vec![
20//! "http://node1:9081".into(),
21//! "http://node2:9082".into()
22//! ])
23//! .connect_timeout(Duration::from_secs(3))
24//! .request_timeout(Duration::from_secs(1))
25//! .enable_compression(true)
26//! .build()
27//! .await
28//! .unwrap();
29//!
30//! // Execute key-value operations
31//! client.kv().put("user:1001", "Alice").await.unwrap();
32//!
33//! let value = client.kv().get("user:1001", false).await.unwrap();
34//!
35//! println!("User data: {:?}", value);
36//!
37//! // Perform cluster management
38//! let members = client.cluster().list_members().await.unwrap();
39//! println!("Cluster members: {:?}", members);
40//!
41//! }
42//! ```
43
44mod builder;
45mod cluster;
46mod config;
47mod error;
48mod kv;
49mod pool;
50
51pub use builder::*;
52pub use cluster::*;
53pub use config::*;
54pub use error::*;
55pub use kv::*;
56pub use pool::*;
57
58#[cfg(test)]
59mod cluster_test;
60#[cfg(test)]
61mod kv_test;
62#[cfg(test)]
63mod pool_test;
64
65use std::sync::Arc;
66
67use arc_swap::ArcSwap;
68use tracing::error;
69
70use crate::proto::client::client_response::SuccessResult;
71use crate::proto::client::write_command;
72use crate::proto::client::ClientResponse;
73use crate::proto::client::ClientResult;
74use crate::proto::client::ReadResults;
75use crate::proto::client::WriteCommand;
76use crate::proto::error::ErrorCode;
77
78/// Main entry point for interacting with the d_engine cluster
79///
80/// Manages connections and provides access to specialized clients:
81/// - Use [`kv()`](Client::kv) for data operations
82/// - Use [`cluster()`](Client::cluster) for cluster administration
83///
84/// Created through the [`builder()`](Client::builder) method
85#[derive(Clone)]
86pub struct Client {
87 /// Key-value store client interface
88 pub(super) kv: KvClient,
89
90 /// Cluster management client interface
91 pub(super) cluster: ClusterClient,
92
93 pub(super) inner: Arc<ArcSwap<ClientInner>>,
94}
95
96#[derive(Clone)]
97pub struct ClientInner {
98 pub(super) pool: ConnectionPool,
99 pub(super) client_id: u32,
100 pub(super) config: ClientConfig,
101 pub(super) endpoints: Vec<String>,
102}
103
104impl Client {
105 /// Access the key-value operations client
106 ///
107 /// # Examples
108 /// ```rust,ignore
109 /// client.kv().put("key", "value").await?;
110 /// ```
111 pub fn kv(&self) -> &KvClient {
112 &self.kv
113 }
114
115 /// Access the cluster management client
116 ///
117 /// # Examples
118 /// ```rust,ignore
119 /// client.cluster().add_node("node3:9083").await?;
120 /// ```
121 pub fn cluster(&self) -> &ClusterClient {
122 &self.cluster
123 }
124
125 /// Create a configured client builder
126 ///
127 /// Starts client construction process with specified bootstrap endpoints.
128 /// Chain configuration methods before calling
129 /// [`build()`](ClientBuilder::build).
130 ///
131 /// # Arguments
132 /// * `endpoints` - Initial cluster nodes for discovery
133 ///
134 /// # Panics
135 /// Will panic if no valid endpoints provided
136 pub fn builder(endpoints: Vec<String>) -> ClientBuilder {
137 assert!(!endpoints.is_empty(), "At least one endpoint required");
138 ClientBuilder::new(endpoints)
139 }
140
141 pub async fn refresh(
142 &mut self,
143 new_endpoints: Option<Vec<String>>,
144 ) -> std::result::Result<(), ClientApiError> {
145 // Get a writable lock
146 let old_inner = self.inner.load();
147 let config = old_inner.config.clone();
148 let endpoints = new_endpoints.unwrap_or(old_inner.endpoints.clone());
149
150 let new_pool = ConnectionPool::create(endpoints.clone(), config.clone()).await?;
151
152 let new_inner = Arc::new(ClientInner {
153 pool: new_pool,
154 client_id: old_inner.client_id,
155 config,
156 endpoints,
157 });
158
159 self.inner.store(new_inner);
160 Ok(())
161 }
162}
163
164impl WriteCommand {
165 /// Create write command for key-value pair
166 ///
167 /// # Parameters
168 /// - `key`: Byte array for storage key
169 /// - `value`: Byte array to be stored
170 pub fn insert(
171 key: impl AsRef<[u8]>,
172 value: impl AsRef<[u8]>,
173 ) -> Self {
174 let cmd = write_command::Insert {
175 key: key.as_ref().to_vec(),
176 value: value.as_ref().to_vec(),
177 };
178 Self {
179 operation: Some(write_command::Operation::Insert(cmd)),
180 }
181 }
182
183 /// Create deletion command for specified key
184 ///
185 /// # Parameters
186 /// - `key`: Byte array of key to delete
187 pub fn delete(key: impl AsRef<[u8]>) -> Self {
188 let cmd = write_command::Delete {
189 key: key.as_ref().to_vec(),
190 };
191 Self {
192 operation: Some(write_command::Operation::Delete(cmd)),
193 }
194 }
195}
196impl ClientResponse {
197 /// Build success response for write operations
198 ///
199 /// # Returns
200 /// Response with NoError code and write confirmation
201 pub fn write_success() -> Self {
202 Self {
203 error: ErrorCode::Success as i32,
204 success_result: Some(SuccessResult::WriteAck(true)),
205 metadata: None,
206 }
207 }
208
209 /// Check if the write operation was successful
210 ///
211 /// # Returns
212 /// - `true` if the response indicates a successful write operation
213 /// - `false` if the response indicates a failed write operation or is not a write response
214 pub fn is_write_success(&self) -> bool {
215 self.error == ErrorCode::Success as i32
216 && matches!(self.success_result, Some(SuccessResult::WriteAck(true)))
217 }
218
219 /// Build success response for read operations
220 ///
221 /// # Parameters
222 /// - `results`: Vector of retrieved key-value pairs
223 pub fn read_results(results: Vec<ClientResult>) -> Self {
224 Self {
225 error: ErrorCode::Success as i32,
226 success_result: Some(SuccessResult::ReadData(ReadResults { results })),
227 metadata: None,
228 }
229 }
230
231 /// Build generic error response for any operation type
232 ///
233 /// # Parameters
234 /// - `error_code`: Predefined client request error code
235 pub fn client_error(error_code: ErrorCode) -> Self {
236 Self {
237 error: error_code as i32,
238 success_result: None,
239 metadata: None,
240 }
241 }
242
243 /// Convert response to boolean write result
244 ///
245 /// # Returns
246 /// - `Ok(true)` on successful write
247 /// - `Err` with converted error code on failure
248 pub fn into_write_result(&self) -> std::result::Result<bool, ClientApiError> {
249 self.validate_error()?;
250 Ok(match self.success_result {
251 Some(SuccessResult::WriteAck(success)) => success,
252 _ => false,
253 })
254 }
255
256 /// Convert response to read results
257 ///
258 /// # Returns
259 /// Vector of optional key-value pairs wrapped in Result
260 pub fn into_read_results(
261 &self
262 ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
263 self.validate_error()?;
264 match &self.success_result {
265 Some(SuccessResult::ReadData(data)) => data
266 .results
267 .clone()
268 .into_iter()
269 .map(|item| {
270 Ok(Some(ClientResult {
271 key: item.key.to_vec(),
272 value: item.value.to_vec(),
273 }))
274 })
275 .collect(),
276 _ => {
277 error!("Invalid response type for read operation");
278 unreachable!()
279 }
280 }
281 }
282
283 /// Validate error code in response header
284 ///
285 /// # Internal Logic
286 /// Converts numeric error code to enum variant
287 pub(crate) fn validate_error(&self) -> std::result::Result<(), ClientApiError> {
288 match ErrorCode::try_from(self.error).unwrap_or(ErrorCode::Uncategorized) {
289 ErrorCode::Success => Ok(()),
290 e => Err(e.into()),
291 }
292 }
293
294 /// Check if this response indicates the leader's term is outdated
295 pub fn is_term_outdated(&self) -> bool {
296 ErrorCode::try_from(self.error).map(|e| e.is_term_outdated()).unwrap_or(false)
297 }
298
299 /// Check if this response indicates a quorum timeout or failure to receive majority responses
300 pub fn is_quorum_timeout_or_failure(&self) -> bool {
301 ErrorCode::try_from(self.error)
302 .map(|e| e.is_quorum_timeout_or_failure())
303 .unwrap_or(false)
304 }
305
306 /// Check if this response indicates a failure to receive majority responses
307 pub fn is_propose_failure(&self) -> bool {
308 ErrorCode::try_from(self.error).map(|e| e.is_propose_failure()).unwrap_or(false)
309 }
310
311 /// Check if this response indicates a a retry required
312 pub fn is_retry_required(&self) -> bool {
313 ErrorCode::try_from(self.error).map(|e| e.is_retry_required()).unwrap_or(false)
314 }
315}
316
317impl ErrorCode {
318 /// Check if this error indicates the leader's term is outdated
319 pub(crate) fn is_term_outdated(&self) -> bool {
320 matches!(self, ErrorCode::TermOutdated)
321 }
322
323 /// Check if this error indicates a quorum timeout or failure to receive majority responses
324 pub(crate) fn is_quorum_timeout_or_failure(&self) -> bool {
325 matches!(
326 self,
327 ErrorCode::ConnectionTimeout | ErrorCode::ProposeFailed | ErrorCode::ClusterUnavailable
328 )
329 }
330
331 /// Check if this error indicates a failure to receive majority responses
332 pub(crate) fn is_propose_failure(&self) -> bool {
333 matches!(self, ErrorCode::ProposeFailed)
334 }
335
336 /// Check if this error indicates a retry required
337 pub(crate) fn is_retry_required(&self) -> bool {
338 matches!(self, ErrorCode::RetryRequired)
339 }
340}