d_engine_proto/exts/
client_ext.rs

1//! Extensions for protobuf-generated client types
2//!
3//! This module provides additional methods for protobuf-generated types
4//! that are not automatically generated by the protobuf compiler.
5
6use bytes::Bytes;
7
8use crate::client::ClientReadRequest;
9use crate::client::ClientResponse;
10use crate::client::ClientResult;
11use crate::client::ReadConsistencyPolicy;
12use crate::client::ReadResults;
13use crate::client::WriteCommand;
14use crate::client::client_response::SuccessResult;
15use crate::client::write_command;
16use crate::error::ErrorCode;
17use crate::error::ErrorMetadata;
18
19impl ClientReadRequest {
20    /// Checks if the consistency_policy field is present in the request
21    ///
22    /// Returns true if the client explicitly specified a consistency policy,
23    /// false if the field is absent (should use server default).
24    pub fn has_consistency_policy(&self) -> bool {
25        self.consistency_policy.is_some()
26    }
27
28    /// Gets the consistency policy value safely
29    ///
30    /// Returns Some(policy) if present, None if field is absent.
31    /// Safer alternative that doesn't panic.
32    pub fn get_consistency_policy(&self) -> Option<ReadConsistencyPolicy> {
33        self.consistency_policy.and_then(|policy_i32| {
34            match policy_i32 {
35                x if x == ReadConsistencyPolicy::LeaseRead as i32 => {
36                    Some(ReadConsistencyPolicy::LeaseRead)
37                }
38                x if x == ReadConsistencyPolicy::LinearizableRead as i32 => {
39                    Some(ReadConsistencyPolicy::LinearizableRead)
40                }
41                _ => None, // Invalid value
42            }
43        })
44    }
45}
46
47impl WriteCommand {
48    /// Create write command for key-value pair
49    ///
50    /// # Parameters
51    /// - `key`: Byte array for storage key
52    /// - `value`: Byte array to be stored
53    pub fn insert(
54        key: impl Into<Bytes>,
55        value: impl Into<Bytes>,
56    ) -> Self {
57        let cmd = write_command::Insert {
58            key: key.into(),
59            value: value.into(),
60            ttl_secs: 0,
61        };
62        Self {
63            operation: Some(write_command::Operation::Insert(cmd)),
64        }
65    }
66
67    /// Create write command for key-value pair with TTL
68    ///
69    /// # Parameters
70    /// - `key`: Byte array for storage key
71    /// - `value`: Byte array to be stored
72    /// - `ttl_secs`: Time-to-live in seconds
73    pub fn insert_with_ttl(
74        key: impl Into<Bytes>,
75        value: impl Into<Bytes>,
76        ttl_secs: u64,
77    ) -> Self {
78        let cmd = write_command::Insert {
79            key: key.into(),
80            value: value.into(),
81            ttl_secs,
82        };
83        Self {
84            operation: Some(write_command::Operation::Insert(cmd)),
85        }
86    }
87
88    /// Create deletion command for specified key
89    ///
90    /// # Parameters
91    /// - `key`: Byte array of key to delete
92    pub fn delete(key: impl Into<Bytes>) -> Self {
93        let cmd = write_command::Delete { key: key.into() };
94        Self {
95            operation: Some(write_command::Operation::Delete(cmd)),
96        }
97    }
98}
99
100impl ClientResponse {
101    /// Build success response for write operations
102    ///
103    /// # Returns
104    /// Response with NoError code and write confirmation
105    pub fn write_success() -> Self {
106        Self {
107            error: ErrorCode::Success as i32,
108            success_result: Some(SuccessResult::WriteAck(true)),
109            metadata: None,
110        }
111    }
112
113    /// Check if the write operation was successful
114    ///
115    /// # Returns
116    /// - `true` if the response indicates a successful write operation
117    /// - `false` if the response indicates a failed write operation or is not a write response
118    pub fn is_write_success(&self) -> bool {
119        self.error == ErrorCode::Success as i32
120            && matches!(self.success_result, Some(SuccessResult::WriteAck(true)))
121    }
122
123    /// Build success response for read operations
124    ///
125    /// # Parameters
126    /// - `results`: Vector of retrieved key-value pairs
127    pub fn read_results(results: Vec<ClientResult>) -> Self {
128        Self {
129            error: ErrorCode::Success as i32,
130            success_result: Some(SuccessResult::ReadData(ReadResults { results })),
131            metadata: None,
132        }
133    }
134
135    /// Build generic error response for any operation type
136    ///
137    /// # Parameters
138    /// - `error_code`: Predefined client request error code
139    pub fn client_error(error_code: ErrorCode) -> Self {
140        Self {
141            error: error_code as i32,
142            success_result: None,
143            metadata: None,
144        }
145    }
146
147    /// Build NOT_LEADER error response with leader metadata
148    ///
149    /// # Parameters
150    /// - `leader_id`: Optional leader node ID
151    /// - `leader_address`: Optional leader address
152    pub fn not_leader(
153        leader_id: Option<String>,
154        leader_address: Option<String>,
155    ) -> Self {
156        let metadata = if leader_id.is_some() || leader_address.is_some() {
157            Some(ErrorMetadata {
158                retry_after_ms: None,
159                leader_id,
160                leader_address,
161                debug_message: None,
162            })
163        } else {
164            None
165        };
166
167        Self {
168            error: ErrorCode::NotLeader as i32,
169            success_result: None,
170            metadata,
171        }
172    }
173
174    /// Check if this response indicates the leader's term is outdated
175    pub fn is_term_outdated(&self) -> bool {
176        ErrorCode::try_from(self.error).map(|e| e.is_term_outdated()).unwrap_or(false)
177    }
178
179    /// Check if this response indicates a quorum timeout or failure to receive majority responses
180    pub fn is_quorum_timeout_or_failure(&self) -> bool {
181        ErrorCode::try_from(self.error)
182            .map(|e| e.is_quorum_timeout_or_failure())
183            .unwrap_or(false)
184    }
185
186    /// Check if this response indicates a failure to receive majority responses
187    pub fn is_propose_failure(&self) -> bool {
188        ErrorCode::try_from(self.error).map(|e| e.is_propose_failure()).unwrap_or(false)
189    }
190
191    /// Check if this response indicates a a retry required
192    pub fn is_retry_required(&self) -> bool {
193        ErrorCode::try_from(self.error).map(|e| e.is_retry_required()).unwrap_or(false)
194    }
195}
196
197impl ErrorCode {
198    /// Check if this error indicates the leader's term is outdated
199    pub fn is_term_outdated(&self) -> bool {
200        matches!(self, ErrorCode::TermOutdated)
201    }
202
203    /// Check if this error indicates a quorum timeout or failure to receive majority responses
204    pub fn is_quorum_timeout_or_failure(&self) -> bool {
205        matches!(
206            self,
207            ErrorCode::ConnectionTimeout | ErrorCode::ProposeFailed | ErrorCode::ClusterUnavailable
208        )
209    }
210
211    /// Check if this error indicates a failure to receive majority responses
212    pub fn is_propose_failure(&self) -> bool {
213        matches!(self, ErrorCode::ProposeFailed)
214    }
215
216    /// Check if this error indicates a retry required
217    pub fn is_retry_required(&self) -> bool {
218        matches!(self, ErrorCode::RetryRequired)
219    }
220}