Skip to main content

d_engine_proto/exts/
client_ext.rs

1//! Extensions for protobuf-generated client types
2//!
3//! This module provides additional methods for protobuf-generated types
4//! that are not automatically generated by the protobuf compiler.
5
6use bytes::Bytes;
7
8use crate::client::ClientReadRequest;
9use crate::client::ClientResponse;
10use crate::client::ClientResult;
11use crate::client::ReadConsistencyPolicy;
12use crate::client::ReadResults;
13use crate::client::WriteCommand;
14use crate::client::WriteResult;
15use crate::client::client_response::SuccessResult;
16use crate::client::write_command;
17use crate::error::ErrorCode;
18use crate::error::ErrorMetadata;
19
20impl ClientReadRequest {
21    /// Checks if the consistency_policy field is present in the request
22    ///
23    /// Returns true if the client explicitly specified a consistency policy,
24    /// false if the field is absent (should use server default).
25    pub fn has_consistency_policy(&self) -> bool {
26        self.consistency_policy.is_some()
27    }
28
29    /// Gets the consistency policy value safely
30    ///
31    /// Returns Some(policy) if present, None if field is absent.
32    /// Safer alternative that doesn't panic.
33    pub fn get_consistency_policy(&self) -> Option<ReadConsistencyPolicy> {
34        self.consistency_policy.and_then(|policy_i32| {
35            match policy_i32 {
36                x if x == ReadConsistencyPolicy::LeaseRead as i32 => {
37                    Some(ReadConsistencyPolicy::LeaseRead)
38                }
39                x if x == ReadConsistencyPolicy::LinearizableRead as i32 => {
40                    Some(ReadConsistencyPolicy::LinearizableRead)
41                }
42                _ => None, // Invalid value
43            }
44        })
45    }
46}
47
48impl WriteCommand {
49    /// Create write command for key-value pair
50    ///
51    /// # Parameters
52    /// - `key`: Byte array for storage key
53    /// - `value`: Byte array to be stored
54    pub fn insert(
55        key: impl Into<Bytes>,
56        value: impl Into<Bytes>,
57    ) -> Self {
58        let cmd = write_command::Insert {
59            key: key.into(),
60            value: value.into(),
61            ttl_secs: 0,
62        };
63        Self {
64            operation: Some(write_command::Operation::Insert(cmd)),
65        }
66    }
67
68    /// Create write command for key-value pair with TTL
69    ///
70    /// # Parameters
71    /// - `key`: Byte array for storage key
72    /// - `value`: Byte array to be stored
73    /// - `ttl_secs`: Time-to-live in seconds
74    pub fn insert_with_ttl(
75        key: impl Into<Bytes>,
76        value: impl Into<Bytes>,
77        ttl_secs: u64,
78    ) -> Self {
79        let cmd = write_command::Insert {
80            key: key.into(),
81            value: value.into(),
82            ttl_secs,
83        };
84        Self {
85            operation: Some(write_command::Operation::Insert(cmd)),
86        }
87    }
88
89    /// Create deletion command for specified key
90    ///
91    /// # Parameters
92    /// - `key`: Byte array of key to delete
93    pub fn delete(key: impl Into<Bytes>) -> Self {
94        let cmd = write_command::Delete { key: key.into() };
95        Self {
96            operation: Some(write_command::Operation::Delete(cmd)),
97        }
98    }
99
100    /// Create compare-and-swap command
101    ///
102    /// # Parameters
103    /// - `key`: Key to operate on
104    /// - `expected_value`: Expected current value (None means key must not exist)
105    /// - `new_value`: New value to set if comparison succeeds
106    pub fn compare_and_swap(
107        key: impl Into<Bytes>,
108        expected_value: Option<impl Into<Bytes>>,
109        new_value: impl Into<Bytes>,
110    ) -> Self {
111        let cmd = write_command::CompareAndSwap {
112            key: key.into(),
113            expected_value: expected_value.map(|v| v.into()),
114            new_value: new_value.into(),
115        };
116        Self {
117            operation: Some(write_command::Operation::CompareAndSwap(cmd)),
118        }
119    }
120}
121
122impl ClientResponse {
123    /// Build success response for write operations
124    ///
125    /// # Returns
126    /// Response with Success code and result=true
127    pub fn write_success() -> Self {
128        Self {
129            error: ErrorCode::Success as i32,
130            success_result: Some(SuccessResult::WriteResult(WriteResult { succeeded: true })),
131            metadata: None,
132        }
133    }
134
135    /// Build CAS success response (comparison succeeded, value updated)
136    ///
137    /// # Returns
138    /// Response with Success code and result=true (CAS succeeded)
139    pub fn cas_success() -> Self {
140        Self {
141            error: ErrorCode::Success as i32,
142            success_result: Some(SuccessResult::WriteResult(WriteResult { succeeded: true })),
143            metadata: None,
144        }
145    }
146
147    /// Build CAS failure response (comparison failed, value NOT updated)
148    ///
149    /// # Returns
150    /// Response with Success code and result=false (CAS failed due to mismatch)
151    pub fn cas_failure() -> Self {
152        Self {
153            error: ErrorCode::Success as i32,
154            success_result: Some(SuccessResult::WriteResult(WriteResult { succeeded: false })),
155            metadata: None,
156        }
157    }
158
159    /// Check if operation completed without error
160    ///
161    /// # Returns
162    /// - `true` if no error occurred (operation executed)
163    /// - `false` if error occurred or invalid response
164    pub fn succeeded(&self) -> bool {
165        self.error == ErrorCode::Success as i32
166            && matches!(self.success_result, Some(SuccessResult::WriteResult(_)))
167    }
168
169    /// Check if write operation succeeded with true result
170    ///
171    /// Strict check for Put/Delete operations ensuring result is true.
172    /// For CAS, use pattern matching to distinguish true/false results.
173    ///
174    /// # Returns
175    /// - `true` for successful Put/Delete (result must be true)
176    /// - `false` otherwise
177    pub fn is_write_success(&self) -> bool {
178        self.error == ErrorCode::Success as i32
179            && matches!(
180                self.success_result,
181                Some(SuccessResult::WriteResult(WriteResult { succeeded: true }))
182            )
183    }
184
185    /// Build success response for read operations
186    ///
187    /// # Parameters
188    /// - `results`: Vector of retrieved key-value pairs
189    pub fn read_results(results: Vec<ClientResult>) -> Self {
190        Self {
191            error: ErrorCode::Success as i32,
192            success_result: Some(SuccessResult::ReadData(ReadResults { results })),
193            metadata: None,
194        }
195    }
196
197    /// Build generic error response for any operation type
198    ///
199    /// # Parameters
200    /// - `error_code`: Predefined client request error code
201    pub fn client_error(error_code: ErrorCode) -> Self {
202        Self {
203            error: error_code as i32,
204            success_result: None,
205            metadata: None,
206        }
207    }
208
209    /// Build NOT_LEADER error response with leader metadata
210    ///
211    /// # Parameters
212    /// - `leader_id`: Optional leader node ID
213    /// - `leader_address`: Optional leader address
214    pub fn not_leader(
215        leader_id: Option<String>,
216        leader_address: Option<String>,
217    ) -> Self {
218        let metadata = if leader_id.is_some() || leader_address.is_some() {
219            Some(ErrorMetadata {
220                retry_after_ms: None,
221                leader_id,
222                leader_address,
223                debug_message: None,
224            })
225        } else {
226            None
227        };
228
229        Self {
230            error: ErrorCode::NotLeader as i32,
231            success_result: None,
232            metadata,
233        }
234    }
235
236    /// Check if this response indicates the leader's term is outdated
237    pub fn is_term_outdated(&self) -> bool {
238        ErrorCode::try_from(self.error).map(|e| e.is_term_outdated()).unwrap_or(false)
239    }
240
241    /// Check if this response indicates a quorum timeout or failure to receive majority responses
242    pub fn is_quorum_timeout_or_failure(&self) -> bool {
243        ErrorCode::try_from(self.error)
244            .map(|e| e.is_quorum_timeout_or_failure())
245            .unwrap_or(false)
246    }
247
248    /// Check if this response indicates a failure to receive majority responses
249    pub fn is_propose_failure(&self) -> bool {
250        ErrorCode::try_from(self.error).map(|e| e.is_propose_failure()).unwrap_or(false)
251    }
252
253    /// Check if this response indicates a a retry required
254    pub fn is_retry_required(&self) -> bool {
255        ErrorCode::try_from(self.error).map(|e| e.is_retry_required()).unwrap_or(false)
256    }
257}
258
259impl ErrorCode {
260    /// Check if this error indicates the leader's term is outdated
261    pub fn is_term_outdated(&self) -> bool {
262        matches!(self, ErrorCode::TermOutdated)
263    }
264
265    /// Check if this error indicates a quorum timeout or failure to receive majority responses
266    pub fn is_quorum_timeout_or_failure(&self) -> bool {
267        matches!(
268            self,
269            ErrorCode::ConnectionTimeout | ErrorCode::ProposeFailed | ErrorCode::ClusterUnavailable
270        )
271    }
272
273    /// Check if this error indicates a failure to receive majority responses
274    pub fn is_propose_failure(&self) -> bool {
275        matches!(self, ErrorCode::ProposeFailed)
276    }
277
278    /// Check if this error indicates a retry required
279    pub fn is_retry_required(&self) -> bool {
280        matches!(self, ErrorCode::RetryRequired)
281    }
282}