d_engine/client/
mod.rs

1//! Client module for distributed consensus system
2//!
3//! Provides core components for interacting with the d_engine cluster:
4//! - [`Client`] - Main entry point with cluster access
5//! - [`ClientBuilder`] - Configurable client construction
6//! - [`KvClient`] - Key-value store operations
7//! - [`ClusterClient`] - Cluster management operations
8//! - [`ConnectionPool`] - Underlying connection management
9//!
10//! # Basic Usage
11//! ```no_run
12//! use d_engine::client::{Client, ClientBuilder};
13//! use std::time::Duration;
14//! use core::error::Error;
15//!
16//! #[tokio::main(flavor = "current_thread")]
17//! async fn main(){
18//!     // Initialize client with automatic cluster discovery
19//!     let client = Client::builder(vec![
20//!         "http://node1:9081".into(),
21//!         "http://node2:9082".into()
22//!     ])
23//!     .connect_timeout(Duration::from_secs(3))
24//!     .request_timeout(Duration::from_secs(1))
25//!     .enable_compression(true)
26//!     .build()
27//!     .await
28//!     .unwrap();
29//!
30//!     // Execute key-value operations
31//!     client.kv().put("user:1001", "Alice").await.unwrap();
32//!
33//!     let value = client.kv().get("user:1001", false).await.unwrap();
34//!
35//!     println!("User data: {:?}", value);
36//!
37//!     // Perform cluster management
38//!     let members = client.cluster().list_members().await.unwrap();
39//!     println!("Cluster members: {:?}", members);
40//!
41//! }
42//! ```
43
44mod builder;
45mod cluster;
46mod config;
47mod error;
48mod kv;
49mod pool;
50
51pub use builder::*;
52pub use cluster::*;
53pub use config::*;
54pub use error::*;
55pub use kv::*;
56pub use pool::*;
57
58#[cfg(test)]
59mod cluster_test;
60#[cfg(test)]
61mod kv_test;
62#[cfg(test)]
63mod pool_test;
64
65use std::sync::Arc;
66
67use arc_swap::ArcSwap;
68use tracing::error;
69
70use crate::proto::client::client_response::SuccessResult;
71use crate::proto::client::write_command;
72use crate::proto::client::ClientResponse;
73use crate::proto::client::ClientResult;
74use crate::proto::client::ReadResults;
75use crate::proto::client::WriteCommand;
76use crate::proto::error::ErrorCode;
77
78/// Main entry point for interacting with the d_engine cluster
79///
80/// Manages connections and provides access to specialized clients:
81/// - Use [`kv()`](Client::kv) for data operations
82/// - Use [`cluster()`](Client::cluster) for cluster administration
83///
84/// Created through the [`builder()`](Client::builder) method
85#[derive(Clone)]
86pub struct Client {
87    /// Key-value store client interface
88    pub(super) kv: KvClient,
89
90    /// Cluster management client interface
91    pub(super) cluster: ClusterClient,
92
93    pub(super) inner: Arc<ArcSwap<ClientInner>>,
94}
95
96#[derive(Clone)]
97pub struct ClientInner {
98    pub(super) pool: ConnectionPool,
99    pub(super) client_id: u32,
100    pub(super) config: ClientConfig,
101    pub(super) endpoints: Vec<String>,
102}
103
104impl Client {
105    /// Access the key-value operations client
106    ///
107    /// # Examples
108    /// ```rust,ignore
109    /// client.kv().put("key", "value").await?;
110    /// ```
111    pub fn kv(&self) -> &KvClient {
112        &self.kv
113    }
114
115    /// Access the cluster management client
116    ///
117    /// # Examples
118    /// ```rust,ignore
119    /// client.cluster().add_node("node3:9083").await?;
120    /// ```
121    pub fn cluster(&self) -> &ClusterClient {
122        &self.cluster
123    }
124
125    /// Create a configured client builder
126    ///
127    /// Starts client construction process with specified bootstrap endpoints.
128    /// Chain configuration methods before calling
129    /// [`build()`](ClientBuilder::build).
130    ///
131    /// # Arguments
132    /// * `endpoints` - Initial cluster nodes for discovery
133    ///
134    /// # Panics
135    /// Will panic if no valid endpoints provided
136    pub fn builder(endpoints: Vec<String>) -> ClientBuilder {
137        assert!(!endpoints.is_empty(), "At least one endpoint required");
138        ClientBuilder::new(endpoints)
139    }
140
141    pub async fn refresh(
142        &mut self,
143        new_endpoints: Option<Vec<String>>,
144    ) -> std::result::Result<(), ClientApiError> {
145        // Get a writable lock
146        let old_inner = self.inner.load();
147        let config = old_inner.config.clone();
148        let endpoints = new_endpoints.unwrap_or(old_inner.endpoints.clone());
149
150        let new_pool = ConnectionPool::create(endpoints.clone(), config.clone()).await?;
151
152        let new_inner = Arc::new(ClientInner {
153            pool: new_pool,
154            client_id: old_inner.client_id,
155            config,
156            endpoints,
157        });
158
159        self.inner.store(new_inner);
160        Ok(())
161    }
162}
163
164impl WriteCommand {
165    /// Create write command for key-value pair
166    ///
167    /// # Parameters
168    /// - `key`: Byte array for storage key
169    /// - `value`: Byte array to be stored
170    pub fn insert(
171        key: impl AsRef<[u8]>,
172        value: impl AsRef<[u8]>,
173    ) -> Self {
174        let cmd = write_command::Insert {
175            key: key.as_ref().to_vec(),
176            value: value.as_ref().to_vec(),
177        };
178        Self {
179            operation: Some(write_command::Operation::Insert(cmd)),
180        }
181    }
182
183    /// Create deletion command for specified key
184    ///
185    /// # Parameters
186    /// - `key`: Byte array of key to delete
187    pub fn delete(key: impl AsRef<[u8]>) -> Self {
188        let cmd = write_command::Delete {
189            key: key.as_ref().to_vec(),
190        };
191        Self {
192            operation: Some(write_command::Operation::Delete(cmd)),
193        }
194    }
195}
196impl ClientResponse {
197    /// Build success response for write operations
198    ///
199    /// # Returns
200    /// Response with NoError code and write confirmation
201    pub fn write_success() -> Self {
202        Self {
203            error: ErrorCode::Success as i32,
204            success_result: Some(SuccessResult::WriteAck(true)),
205            metadata: None,
206        }
207    }
208
209    /// Check if the write operation was successful
210    ///
211    /// # Returns
212    /// - `true` if the response indicates a successful write operation
213    /// - `false` if the response indicates a failed write operation or is not a write response
214    pub fn is_write_success(&self) -> bool {
215        self.error == ErrorCode::Success as i32
216            && matches!(self.success_result, Some(SuccessResult::WriteAck(true)))
217    }
218
219    /// Build success response for read operations
220    ///
221    /// # Parameters
222    /// - `results`: Vector of retrieved key-value pairs
223    pub fn read_results(results: Vec<ClientResult>) -> Self {
224        Self {
225            error: ErrorCode::Success as i32,
226            success_result: Some(SuccessResult::ReadData(ReadResults { results })),
227            metadata: None,
228        }
229    }
230
231    /// Build generic error response for any operation type
232    ///
233    /// # Parameters
234    /// - `error_code`: Predefined client request error code
235    pub fn client_error(error_code: ErrorCode) -> Self {
236        Self {
237            error: error_code as i32,
238            success_result: None,
239            metadata: None,
240        }
241    }
242
243    /// Convert response to boolean write result
244    ///
245    /// # Returns
246    /// - `Ok(true)` on successful write
247    /// - `Err` with converted error code on failure
248    pub fn into_write_result(&self) -> std::result::Result<bool, ClientApiError> {
249        self.validate_error()?;
250        Ok(match self.success_result {
251            Some(SuccessResult::WriteAck(success)) => success,
252            _ => false,
253        })
254    }
255
256    /// Convert response to read results
257    ///
258    /// # Returns
259    /// Vector of optional key-value pairs wrapped in Result
260    pub fn into_read_results(
261        &self
262    ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
263        self.validate_error()?;
264        match &self.success_result {
265            Some(SuccessResult::ReadData(data)) => data
266                .results
267                .clone()
268                .into_iter()
269                .map(|item| {
270                    Ok(Some(ClientResult {
271                        key: item.key.to_vec(),
272                        value: item.value.to_vec(),
273                    }))
274                })
275                .collect(),
276            _ => {
277                error!("Invalid response type for read operation");
278                unreachable!()
279            }
280        }
281    }
282
283    /// Validate error code in response header
284    ///
285    /// # Internal Logic
286    /// Converts numeric error code to enum variant
287    pub(crate) fn validate_error(&self) -> std::result::Result<(), ClientApiError> {
288        match ErrorCode::try_from(self.error).unwrap_or(ErrorCode::Uncategorized) {
289            ErrorCode::Success => Ok(()),
290            e => Err(e.into()),
291        }
292    }
293
294    /// Check if this response indicates the leader's term is outdated
295    pub fn is_term_outdated(&self) -> bool {
296        ErrorCode::try_from(self.error).map(|e| e.is_term_outdated()).unwrap_or(false)
297    }
298
299    /// Check if this response indicates a quorum timeout or failure to receive majority responses
300    pub fn is_quorum_timeout_or_failure(&self) -> bool {
301        ErrorCode::try_from(self.error)
302            .map(|e| e.is_quorum_timeout_or_failure())
303            .unwrap_or(false)
304    }
305
306    /// Check if this response indicates a failure to receive majority responses
307    pub fn is_propose_failure(&self) -> bool {
308        ErrorCode::try_from(self.error).map(|e| e.is_propose_failure()).unwrap_or(false)
309    }
310
311    /// Check if this response indicates a a retry required
312    pub fn is_retry_required(&self) -> bool {
313        ErrorCode::try_from(self.error).map(|e| e.is_retry_required()).unwrap_or(false)
314    }
315}
316
317impl ErrorCode {
318    /// Check if this error indicates the leader's term is outdated
319    pub(crate) fn is_term_outdated(&self) -> bool {
320        matches!(self, ErrorCode::TermOutdated)
321    }
322
323    /// Check if this error indicates a quorum timeout or failure to receive majority responses
324    pub(crate) fn is_quorum_timeout_or_failure(&self) -> bool {
325        matches!(
326            self,
327            ErrorCode::ConnectionTimeout | ErrorCode::ProposeFailed | ErrorCode::ClusterUnavailable
328        )
329    }
330
331    /// Check if this error indicates a failure to receive majority responses
332    pub(crate) fn is_propose_failure(&self) -> bool {
333        matches!(self, ErrorCode::ProposeFailed)
334    }
335
336    /// Check if this error indicates a retry required
337    pub(crate) fn is_retry_required(&self) -> bool {
338        matches!(self, ErrorCode::RetryRequired)
339    }
340}