d_engine/client/mod.rs
1//! Client module for distributed consensus system
2//!
3//! Provides core components for interacting with the d_engine cluster:
4//! - [`Client`] - Main entry point with cluster access
5//! - [`ClientBuilder`] - Configurable client construction
6//! - [`KvClient`] - Key-value store operations
7//! - [`ClusterClient`] - Cluster management operations
8//! - [`ConnectionPool`] - Underlying connection management
9//!
10//! # Basic Usage
11//! ```no_run
12//! use d_engine::client::{Client, ClientBuilder};
13//! use std::time::Duration;
14//! use core::error::Error;
15//!
16//! #[tokio::main(flavor = "current_thread")]
17//! async fn main(){
18//! // Initialize client with automatic cluster discovery
19//! let client = Client::builder(vec![
20//! "http://node1:9081".into(),
21//! "http://node2:9082".into()
22//! ])
23//! .connect_timeout(Duration::from_secs(3))
24//! .request_timeout(Duration::from_secs(1))
25//! .enable_compression(true)
26//! .build()
27//! .await
28//! .unwrap();
29//!
30//! // Execute key-value operations
31//! client.kv().put("user:1001", "Alice").await.unwrap();
32//!
33//! let value = client.kv().get("user:1001", false).await.unwrap();
34//!
35//! println!("User data: {:?}", value);
36//!
37//! // Perform cluster management
38//! let members = client.cluster().list_members().await.unwrap();
39//! println!("Cluster members: {:?}", members);
40//!
41//! }
42//! ```
43
44mod builder;
45mod client;
46mod cluster;
47mod config;
48mod kv;
49mod pool;
50
51pub use builder::*;
52pub use client::*;
53pub use cluster::*;
54pub use config::*;
55pub use kv::*;
56use log::error;
57pub use pool::*;
58
59#[cfg(test)]
60mod pool_test;
61
62//---
63
64use crate::proto::client_command;
65use crate::proto::client_response;
66use crate::proto::ClientCommand;
67use crate::proto::ClientRequestError;
68use crate::proto::ClientResponse;
69use crate::proto::ClientResult;
70use crate::proto::ReadResults;
71use crate::Error;
72use crate::Result;
73
74impl ClientCommand {
75 /// Create read command for specified key
76 ///
77 /// # Parameters
78 /// - `key`: Byte array representing data key
79 pub fn get(key: impl AsRef<[u8]>) -> Self {
80 Self {
81 command: Some(client_command::Command::Get(key.as_ref().to_vec())),
82 }
83 }
84
85 /// Create write command for key-value pair
86 ///
87 /// # Parameters
88 /// - `key`: Byte array for storage key
89 /// - `value`: Byte array to be stored
90 pub fn insert(
91 key: impl AsRef<[u8]>,
92 value: impl AsRef<[u8]>,
93 ) -> Self {
94 let insert_cmd = client_command::Insert {
95 key: key.as_ref().to_vec(),
96 value: value.as_ref().to_vec(),
97 };
98 Self {
99 command: Some(client_command::Command::Insert(insert_cmd)),
100 }
101 }
102
103 /// Create deletion command for specified key
104 ///
105 /// # Parameters
106 /// - `key`: Byte array of key to delete
107 pub fn delete(key: impl AsRef<[u8]>) -> Self {
108 Self {
109 command: Some(client_command::Command::Delete(key.as_ref().to_vec())),
110 }
111 }
112
113 /// Create empty operation command for heartbeat detection
114 ///
115 /// # Usage
116 /// Maintains connection activity without data operation
117 pub(crate) fn no_op() -> Self {
118 Self {
119 command: Some(client_command::Command::NoOp(true)),
120 }
121 }
122}
123impl ClientResponse {
124 /// Build success response for write operations
125 ///
126 /// # Returns
127 /// Response with NoError code and write confirmation
128 pub fn write_success() -> Self {
129 Self {
130 error_code: ClientRequestError::NoError as i32,
131 result: Some(client_response::Result::WriteResult(true)),
132 }
133 }
134 /// Build error response for write operations
135 ///
136 /// # Parameters
137 /// - `error`: Error type implementing conversion to ClientRequestError
138 pub fn write_error(error: Error) -> Self {
139 Self {
140 error_code: <ClientRequestError as Into<i32>>::into(ClientRequestError::from(error)),
141 result: None,
142 }
143 }
144
145 /// Build success response for read operations
146 ///
147 /// # Parameters
148 /// - `results`: Vector of retrieved key-value pairs
149 pub fn read_results(results: Vec<ClientResult>) -> Self {
150 Self {
151 error_code: ClientRequestError::NoError as i32,
152 result: Some(client_response::Result::ReadResults(ReadResults { results })),
153 }
154 }
155
156 /// Build generic error response for any operation type
157 ///
158 /// # Parameters
159 /// - `error`: Predefined client request error code
160 pub fn error(error: ClientRequestError) -> Self {
161 Self {
162 error_code: error as i32,
163 result: None,
164 }
165 }
166
167 /// Convert response to boolean write result
168 ///
169 /// # Returns
170 /// - `Ok(true)` on successful write
171 /// - `Err` with converted error code on failure
172 pub fn into_write_result(&self) -> Result<bool> {
173 self.validate_error()?;
174 Ok(match self.result {
175 Some(client_response::Result::WriteResult(success)) => success,
176 _ => false,
177 })
178 }
179
180 /// Convert response to read results
181 ///
182 /// # Returns
183 /// Vector of optional key-value pairs wrapped in Result
184 pub fn into_read_results(&self) -> Result<Vec<Option<ClientResult>>> {
185 self.validate_error()?;
186 match &self.result {
187 Some(client_response::Result::ReadResults(read_results)) => read_results
188 .results
189 .clone()
190 .into_iter()
191 .map(|item| {
192 Ok(Some(ClientResult {
193 key: item.key.to_vec(),
194 value: item.value.to_vec(),
195 }))
196 })
197 .collect(),
198 _ => {
199 error!("Invalid response type for read operation");
200 Err(Error::InvalidResponseType)
201 }
202 }
203 }
204
205 /// Validate error code in response header
206 ///
207 /// # Internal Logic
208 /// Converts numeric error code to enum variant
209 fn validate_error(&self) -> Result<()> {
210 match ClientRequestError::try_from(self.error_code).unwrap_or(ClientRequestError::NoError) {
211 ClientRequestError::NoError => Ok(()),
212 e => Err(e.into()),
213 }
214 }
215}