d_engine_proto/exts/
client_ext.rs1use bytes::Bytes;
7
8use crate::client::ClientReadRequest;
9use crate::client::ClientResponse;
10use crate::client::ClientResult;
11use crate::client::ReadConsistencyPolicy;
12use crate::client::ReadResults;
13use crate::client::WriteCommand;
14use crate::client::client_response::SuccessResult;
15use crate::client::write_command;
16use crate::error::ErrorCode;
17use crate::error::ErrorMetadata;
18
19impl ClientReadRequest {
20 pub fn has_consistency_policy(&self) -> bool {
25 self.consistency_policy.is_some()
26 }
27
28 pub fn get_consistency_policy(&self) -> Option<ReadConsistencyPolicy> {
33 self.consistency_policy.and_then(|policy_i32| {
34 match policy_i32 {
35 x if x == ReadConsistencyPolicy::LeaseRead as i32 => {
36 Some(ReadConsistencyPolicy::LeaseRead)
37 }
38 x if x == ReadConsistencyPolicy::LinearizableRead as i32 => {
39 Some(ReadConsistencyPolicy::LinearizableRead)
40 }
41 _ => None, }
43 })
44 }
45}
46
47impl WriteCommand {
48 pub fn insert(
54 key: impl Into<Bytes>,
55 value: impl Into<Bytes>,
56 ) -> Self {
57 let cmd = write_command::Insert {
58 key: key.into(),
59 value: value.into(),
60 ttl_secs: 0,
61 };
62 Self {
63 operation: Some(write_command::Operation::Insert(cmd)),
64 }
65 }
66
67 pub fn insert_with_ttl(
74 key: impl Into<Bytes>,
75 value: impl Into<Bytes>,
76 ttl_secs: u64,
77 ) -> Self {
78 let cmd = write_command::Insert {
79 key: key.into(),
80 value: value.into(),
81 ttl_secs,
82 };
83 Self {
84 operation: Some(write_command::Operation::Insert(cmd)),
85 }
86 }
87
88 pub fn delete(key: impl Into<Bytes>) -> Self {
93 let cmd = write_command::Delete { key: key.into() };
94 Self {
95 operation: Some(write_command::Operation::Delete(cmd)),
96 }
97 }
98}
99
100impl ClientResponse {
101 pub fn write_success() -> Self {
106 Self {
107 error: ErrorCode::Success as i32,
108 success_result: Some(SuccessResult::WriteAck(true)),
109 metadata: None,
110 }
111 }
112
113 pub fn is_write_success(&self) -> bool {
119 self.error == ErrorCode::Success as i32
120 && matches!(self.success_result, Some(SuccessResult::WriteAck(true)))
121 }
122
123 pub fn read_results(results: Vec<ClientResult>) -> Self {
128 Self {
129 error: ErrorCode::Success as i32,
130 success_result: Some(SuccessResult::ReadData(ReadResults { results })),
131 metadata: None,
132 }
133 }
134
135 pub fn client_error(error_code: ErrorCode) -> Self {
140 Self {
141 error: error_code as i32,
142 success_result: None,
143 metadata: None,
144 }
145 }
146
147 pub fn not_leader(
153 leader_id: Option<String>,
154 leader_address: Option<String>,
155 ) -> Self {
156 let metadata = if leader_id.is_some() || leader_address.is_some() {
157 Some(ErrorMetadata {
158 retry_after_ms: None,
159 leader_id,
160 leader_address,
161 debug_message: None,
162 })
163 } else {
164 None
165 };
166
167 Self {
168 error: ErrorCode::NotLeader as i32,
169 success_result: None,
170 metadata,
171 }
172 }
173
174 pub fn is_term_outdated(&self) -> bool {
176 ErrorCode::try_from(self.error).map(|e| e.is_term_outdated()).unwrap_or(false)
177 }
178
179 pub fn is_quorum_timeout_or_failure(&self) -> bool {
181 ErrorCode::try_from(self.error)
182 .map(|e| e.is_quorum_timeout_or_failure())
183 .unwrap_or(false)
184 }
185
186 pub fn is_propose_failure(&self) -> bool {
188 ErrorCode::try_from(self.error).map(|e| e.is_propose_failure()).unwrap_or(false)
189 }
190
191 pub fn is_retry_required(&self) -> bool {
193 ErrorCode::try_from(self.error).map(|e| e.is_retry_required()).unwrap_or(false)
194 }
195}
196
197impl ErrorCode {
198 pub fn is_term_outdated(&self) -> bool {
200 matches!(self, ErrorCode::TermOutdated)
201 }
202
203 pub fn is_quorum_timeout_or_failure(&self) -> bool {
205 matches!(
206 self,
207 ErrorCode::ConnectionTimeout | ErrorCode::ProposeFailed | ErrorCode::ClusterUnavailable
208 )
209 }
210
211 pub fn is_propose_failure(&self) -> bool {
213 matches!(self, ErrorCode::ProposeFailed)
214 }
215
216 pub fn is_retry_required(&self) -> bool {
218 matches!(self, ErrorCode::RetryRequired)
219 }
220}