d_engine_proto/exts/
client_ext.rs1use bytes::Bytes;
7
8use crate::client::ClientReadRequest;
9use crate::client::ClientResponse;
10use crate::client::ClientResult;
11use crate::client::ReadConsistencyPolicy;
12use crate::client::ReadResults;
13use crate::client::WriteCommand;
14use crate::client::WriteResult;
15use crate::client::client_response::SuccessResult;
16use crate::client::write_command;
17use crate::error::ErrorCode;
18use crate::error::ErrorMetadata;
19
20impl ClientReadRequest {
21 pub fn has_consistency_policy(&self) -> bool {
26 self.consistency_policy.is_some()
27 }
28
29 pub fn get_consistency_policy(&self) -> Option<ReadConsistencyPolicy> {
34 self.consistency_policy.and_then(|policy_i32| {
35 match policy_i32 {
36 x if x == ReadConsistencyPolicy::LeaseRead as i32 => {
37 Some(ReadConsistencyPolicy::LeaseRead)
38 }
39 x if x == ReadConsistencyPolicy::LinearizableRead as i32 => {
40 Some(ReadConsistencyPolicy::LinearizableRead)
41 }
42 _ => None, }
44 })
45 }
46}
47
48impl WriteCommand {
49 pub fn insert(
55 key: impl Into<Bytes>,
56 value: impl Into<Bytes>,
57 ) -> Self {
58 let cmd = write_command::Insert {
59 key: key.into(),
60 value: value.into(),
61 ttl_secs: 0,
62 };
63 Self {
64 operation: Some(write_command::Operation::Insert(cmd)),
65 }
66 }
67
68 pub fn insert_with_ttl(
75 key: impl Into<Bytes>,
76 value: impl Into<Bytes>,
77 ttl_secs: u64,
78 ) -> Self {
79 let cmd = write_command::Insert {
80 key: key.into(),
81 value: value.into(),
82 ttl_secs,
83 };
84 Self {
85 operation: Some(write_command::Operation::Insert(cmd)),
86 }
87 }
88
89 pub fn delete(key: impl Into<Bytes>) -> Self {
94 let cmd = write_command::Delete { key: key.into() };
95 Self {
96 operation: Some(write_command::Operation::Delete(cmd)),
97 }
98 }
99
100 pub fn compare_and_swap(
107 key: impl Into<Bytes>,
108 expected_value: Option<impl Into<Bytes>>,
109 new_value: impl Into<Bytes>,
110 ) -> Self {
111 let cmd = write_command::CompareAndSwap {
112 key: key.into(),
113 expected_value: expected_value.map(|v| v.into()),
114 new_value: new_value.into(),
115 };
116 Self {
117 operation: Some(write_command::Operation::CompareAndSwap(cmd)),
118 }
119 }
120}
121
122impl ClientResponse {
123 pub fn write_success() -> Self {
128 Self {
129 error: ErrorCode::Success as i32,
130 success_result: Some(SuccessResult::WriteResult(WriteResult { succeeded: true })),
131 metadata: None,
132 }
133 }
134
135 pub fn cas_success() -> Self {
140 Self {
141 error: ErrorCode::Success as i32,
142 success_result: Some(SuccessResult::WriteResult(WriteResult { succeeded: true })),
143 metadata: None,
144 }
145 }
146
147 pub fn cas_failure() -> Self {
152 Self {
153 error: ErrorCode::Success as i32,
154 success_result: Some(SuccessResult::WriteResult(WriteResult { succeeded: false })),
155 metadata: None,
156 }
157 }
158
159 pub fn succeeded(&self) -> bool {
165 self.error == ErrorCode::Success as i32
166 && matches!(self.success_result, Some(SuccessResult::WriteResult(_)))
167 }
168
169 pub fn is_write_success(&self) -> bool {
178 self.error == ErrorCode::Success as i32
179 && matches!(
180 self.success_result,
181 Some(SuccessResult::WriteResult(WriteResult { succeeded: true }))
182 )
183 }
184
185 pub fn read_results(results: Vec<ClientResult>) -> Self {
190 Self {
191 error: ErrorCode::Success as i32,
192 success_result: Some(SuccessResult::ReadData(ReadResults { results })),
193 metadata: None,
194 }
195 }
196
197 pub fn client_error(error_code: ErrorCode) -> Self {
202 Self {
203 error: error_code as i32,
204 success_result: None,
205 metadata: None,
206 }
207 }
208
209 pub fn not_leader(
215 leader_id: Option<String>,
216 leader_address: Option<String>,
217 ) -> Self {
218 let metadata = if leader_id.is_some() || leader_address.is_some() {
219 Some(ErrorMetadata {
220 retry_after_ms: None,
221 leader_id,
222 leader_address,
223 debug_message: None,
224 })
225 } else {
226 None
227 };
228
229 Self {
230 error: ErrorCode::NotLeader as i32,
231 success_result: None,
232 metadata,
233 }
234 }
235
236 pub fn is_term_outdated(&self) -> bool {
238 ErrorCode::try_from(self.error).map(|e| e.is_term_outdated()).unwrap_or(false)
239 }
240
241 pub fn is_quorum_timeout_or_failure(&self) -> bool {
243 ErrorCode::try_from(self.error)
244 .map(|e| e.is_quorum_timeout_or_failure())
245 .unwrap_or(false)
246 }
247
248 pub fn is_propose_failure(&self) -> bool {
250 ErrorCode::try_from(self.error).map(|e| e.is_propose_failure()).unwrap_or(false)
251 }
252
253 pub fn is_retry_required(&self) -> bool {
255 ErrorCode::try_from(self.error).map(|e| e.is_retry_required()).unwrap_or(false)
256 }
257}
258
259impl ErrorCode {
260 pub fn is_term_outdated(&self) -> bool {
262 matches!(self, ErrorCode::TermOutdated)
263 }
264
265 pub fn is_quorum_timeout_or_failure(&self) -> bool {
267 matches!(
268 self,
269 ErrorCode::ConnectionTimeout | ErrorCode::ProposeFailed | ErrorCode::ClusterUnavailable
270 )
271 }
272
273 pub fn is_propose_failure(&self) -> bool {
275 matches!(self, ErrorCode::ProposeFailed)
276 }
277
278 pub fn is_retry_required(&self) -> bool {
280 matches!(self, ErrorCode::RetryRequired)
281 }
282}