d_engine/client/
mod.rs

1//! Client module for distributed consensus system
2//!
3//! Provides core components for interacting with the d_engine cluster:
4//! - [`Client`] - Main entry point with cluster access
5//! - [`ClientBuilder`] - Configurable client construction
6//! - [`KvClient`] - Key-value store operations
7//! - [`ClusterClient`] - Cluster management operations
8//! - [`ConnectionPool`] - Underlying connection management
9//!
10//! # Basic Usage
11//! ```no_run
12//! use d_engine::client::{Client, ClientBuilder};
13//! use std::time::Duration;
14//! use core::error::Error;
15//!
16//! #[tokio::main(flavor = "current_thread")]
17//! async fn main(){
18//!     // Initialize client with automatic cluster discovery
19//!     let client = Client::builder(vec![
20//!         "http://node1:9081".into(),
21//!         "http://node2:9082".into()
22//!     ])
23//!     .connect_timeout(Duration::from_secs(3))
24//!     .request_timeout(Duration::from_secs(1))
25//!     .enable_compression(true)
26//!     .build()
27//!     .await
28//!     .unwrap();
29//!
30//!     // Execute key-value operations
31//!     client.kv().put("user:1001", "Alice").await.unwrap();
32//!
33//!     let value = client.kv().get("user:1001").await.unwrap();
34//!
35//!     println!("User data: {:?}", value);
36//!
37//!     // Perform cluster management
38//!     let members = client.cluster().list_members().await.unwrap();
39//!     println!("Cluster members: {:?}", members);
40//!
41//! }
42//! ```
43
44mod builder;
45mod cluster;
46mod config;
47mod error;
48mod kv;
49mod pool;
50
51pub use builder::*;
52use bytes::Bytes;
53pub use cluster::*;
54pub use config::*;
55pub use error::*;
56pub use kv::*;
57pub use pool::*;
58
59#[cfg(test)]
60mod cluster_test;
61#[cfg(test)]
62mod kv_test;
63#[cfg(test)]
64mod pool_test;
65
66use std::sync::Arc;
67
68use arc_swap::ArcSwap;
69use tracing::error;
70
71use crate::proto::client::client_response::SuccessResult;
72use crate::proto::client::write_command;
73use crate::proto::client::ClientResponse;
74use crate::proto::client::ClientResult;
75use crate::proto::client::ReadResults;
76use crate::proto::client::WriteCommand;
77use crate::proto::error::ErrorCode;
78
79/// Main entry point for interacting with the d_engine cluster
80///
81/// Manages connections and provides access to specialized clients:
82/// - Use [`kv()`](Client::kv) for data operations
83/// - Use [`cluster()`](Client::cluster) for cluster administration
84///
85/// Created through the [`builder()`](Client::builder) method
86#[derive(Clone)]
87pub struct Client {
88    /// Key-value store client interface
89    pub(super) kv: KvClient,
90
91    /// Cluster management client interface
92    pub(super) cluster: ClusterClient,
93
94    pub(super) inner: Arc<ArcSwap<ClientInner>>,
95}
96
97#[derive(Clone)]
98pub struct ClientInner {
99    pub(super) pool: ConnectionPool,
100    pub(super) client_id: u32,
101    pub(super) config: ClientConfig,
102    pub(super) endpoints: Vec<String>,
103}
104
105impl Client {
106    /// Access the key-value operations client
107    ///
108    /// # Examples
109    /// ```rust,ignore
110    /// client.kv().put("key", "value").await?;
111    /// ```
112    pub fn kv(&self) -> &KvClient {
113        &self.kv
114    }
115
116    /// Access the cluster management client
117    ///
118    /// # Examples
119    /// ```rust,ignore
120    /// client.cluster().add_node("node3:9083").await?;
121    /// ```
122    pub fn cluster(&self) -> &ClusterClient {
123        &self.cluster
124    }
125
126    /// Create a configured client builder
127    ///
128    /// Starts client construction process with specified bootstrap endpoints.
129    /// Chain configuration methods before calling
130    /// [`build()`](ClientBuilder::build).
131    ///
132    /// # Arguments
133    /// * `endpoints` - Initial cluster nodes for discovery
134    ///
135    /// # Panics
136    /// Will panic if no valid endpoints provided
137    pub fn builder(endpoints: Vec<String>) -> ClientBuilder {
138        assert!(!endpoints.is_empty(), "At least one endpoint required");
139        ClientBuilder::new(endpoints)
140    }
141
142    pub async fn refresh(
143        &mut self,
144        new_endpoints: Option<Vec<String>>,
145    ) -> std::result::Result<(), ClientApiError> {
146        // Get a writable lock
147        let old_inner = self.inner.load();
148        let config = old_inner.config.clone();
149        let endpoints = new_endpoints.unwrap_or(old_inner.endpoints.clone());
150
151        let new_pool = ConnectionPool::create(endpoints.clone(), config.clone()).await?;
152
153        let new_inner = Arc::new(ClientInner {
154            pool: new_pool,
155            client_id: old_inner.client_id,
156            config,
157            endpoints,
158        });
159
160        self.inner.store(new_inner);
161        Ok(())
162    }
163}
164
165impl WriteCommand {
166    /// Create write command for key-value pair
167    ///
168    /// # Parameters
169    /// - `key`: Byte array for storage key
170    /// - `value`: Byte array to be stored
171    pub fn insert(
172        key: impl Into<Bytes>,
173        value: impl Into<Bytes>,
174    ) -> Self {
175        let cmd = write_command::Insert {
176            key: key.into(),
177            value: value.into(),
178        };
179        Self {
180            operation: Some(write_command::Operation::Insert(cmd)),
181        }
182    }
183
184    /// Create deletion command for specified key
185    ///
186    /// # Parameters
187    /// - `key`: Byte array of key to delete
188    pub fn delete(key: impl Into<Bytes>) -> Self {
189        let cmd = write_command::Delete { key: key.into() };
190        Self {
191            operation: Some(write_command::Operation::Delete(cmd)),
192        }
193    }
194}
195impl ClientResponse {
196    /// Build success response for write operations
197    ///
198    /// # Returns
199    /// Response with NoError code and write confirmation
200    pub fn write_success() -> Self {
201        Self {
202            error: ErrorCode::Success as i32,
203            success_result: Some(SuccessResult::WriteAck(true)),
204            metadata: None,
205        }
206    }
207
208    /// Check if the write operation was successful
209    ///
210    /// # Returns
211    /// - `true` if the response indicates a successful write operation
212    /// - `false` if the response indicates a failed write operation or is not a write response
213    pub fn is_write_success(&self) -> bool {
214        self.error == ErrorCode::Success as i32
215            && matches!(self.success_result, Some(SuccessResult::WriteAck(true)))
216    }
217
218    /// Build success response for read operations
219    ///
220    /// # Parameters
221    /// - `results`: Vector of retrieved key-value pairs
222    pub fn read_results(results: Vec<ClientResult>) -> Self {
223        Self {
224            error: ErrorCode::Success as i32,
225            success_result: Some(SuccessResult::ReadData(ReadResults { results })),
226            metadata: None,
227        }
228    }
229
230    /// Build generic error response for any operation type
231    ///
232    /// # Parameters
233    /// - `error_code`: Predefined client request error code
234    pub fn client_error(error_code: ErrorCode) -> Self {
235        Self {
236            error: error_code as i32,
237            success_result: None,
238            metadata: None,
239        }
240    }
241
242    /// Convert response to boolean write result
243    ///
244    /// # Returns
245    /// - `Ok(true)` on successful write
246    /// - `Err` with converted error code on failure
247    pub fn into_write_result(&self) -> std::result::Result<bool, ClientApiError> {
248        self.validate_error()?;
249        Ok(match self.success_result {
250            Some(SuccessResult::WriteAck(success)) => success,
251            _ => false,
252        })
253    }
254
255    /// Convert response to read results
256    ///
257    /// # Returns
258    /// Vector of optional key-value pairs wrapped in Result
259    pub fn into_read_results(
260        &self
261    ) -> std::result::Result<Vec<Option<ClientResult>>, ClientApiError> {
262        self.validate_error()?;
263        match &self.success_result {
264            Some(SuccessResult::ReadData(data)) => data
265                .results
266                .clone()
267                .into_iter()
268                .map(|item| {
269                    Ok(Some(ClientResult {
270                        key: item.key,
271                        value: item.value,
272                    }))
273                })
274                .collect(),
275            _ => {
276                error!("Invalid response type for read operation");
277                unreachable!()
278            }
279        }
280    }
281
282    /// Validate error code in response header
283    ///
284    /// # Internal Logic
285    /// Converts numeric error code to enum variant
286    pub(crate) fn validate_error(&self) -> std::result::Result<(), ClientApiError> {
287        match ErrorCode::try_from(self.error).unwrap_or(ErrorCode::Uncategorized) {
288            ErrorCode::Success => Ok(()),
289            e => Err(e.into()),
290        }
291    }
292
293    /// Check if this response indicates the leader's term is outdated
294    pub fn is_term_outdated(&self) -> bool {
295        ErrorCode::try_from(self.error).map(|e| e.is_term_outdated()).unwrap_or(false)
296    }
297
298    /// Check if this response indicates a quorum timeout or failure to receive majority responses
299    pub fn is_quorum_timeout_or_failure(&self) -> bool {
300        ErrorCode::try_from(self.error)
301            .map(|e| e.is_quorum_timeout_or_failure())
302            .unwrap_or(false)
303    }
304
305    /// Check if this response indicates a failure to receive majority responses
306    pub fn is_propose_failure(&self) -> bool {
307        ErrorCode::try_from(self.error).map(|e| e.is_propose_failure()).unwrap_or(false)
308    }
309
310    /// Check if this response indicates a a retry required
311    pub fn is_retry_required(&self) -> bool {
312        ErrorCode::try_from(self.error).map(|e| e.is_retry_required()).unwrap_or(false)
313    }
314}
315
316impl ErrorCode {
317    /// Check if this error indicates the leader's term is outdated
318    pub(crate) fn is_term_outdated(&self) -> bool {
319        matches!(self, ErrorCode::TermOutdated)
320    }
321
322    /// Check if this error indicates a quorum timeout or failure to receive majority responses
323    pub(crate) fn is_quorum_timeout_or_failure(&self) -> bool {
324        matches!(
325            self,
326            ErrorCode::ConnectionTimeout | ErrorCode::ProposeFailed | ErrorCode::ClusterUnavailable
327        )
328    }
329
330    /// Check if this error indicates a failure to receive majority responses
331    pub(crate) fn is_propose_failure(&self) -> bool {
332        matches!(self, ErrorCode::ProposeFailed)
333    }
334
335    /// Check if this error indicates a retry required
336    pub(crate) fn is_retry_required(&self) -> bool {
337        matches!(self, ErrorCode::RetryRequired)
338    }
339}