1use super::message::Msg;
9use super::msg_type::MsgType;
10use super::DynErrorCode;
11use crate::io::mbuf::MbufPool;
12
13fn render_error_wire(err_type: MsgType, message: &str) -> Vec<u8> {
24 match err_type {
25 MsgType::RspRedisError => format!("-Dynomite: {message}\r\n").into_bytes(),
26 MsgType::RspRedisErrorErr => format!("-ERR {message}\r\n").into_bytes(),
27 MsgType::RspRedisErrorOom => format!("-OOM {message}\r\n").into_bytes(),
28 MsgType::RspRedisErrorBusy => format!("-BUSY {message}\r\n").into_bytes(),
29 MsgType::RspRedisErrorNoauth => format!("-NOAUTH {message}\r\n").into_bytes(),
30 MsgType::RspRedisErrorLoading => format!("-LOADING {message}\r\n").into_bytes(),
31 MsgType::RspRedisErrorBusykey => format!("-BUSYKEY {message}\r\n").into_bytes(),
32 MsgType::RspRedisErrorMisconf => format!("-MISCONF {message}\r\n").into_bytes(),
33 MsgType::RspRedisErrorNoscript => format!("-NOSCRIPT {message}\r\n").into_bytes(),
34 MsgType::RspRedisErrorReadonly => format!("-READONLY {message}\r\n").into_bytes(),
35 MsgType::RspRedisErrorWrongtype => format!("-WRONGTYPE {message}\r\n").into_bytes(),
36 MsgType::RspRedisErrorExecabort => format!("-EXECABORT {message}\r\n").into_bytes(),
37 MsgType::RspRedisErrorMasterdown => format!("-MASTERDOWN {message}\r\n").into_bytes(),
38 MsgType::RspRedisErrorNoreplicas => format!("-NOREPLICAS {message}\r\n").into_bytes(),
39 MsgType::RspMcServerError => format!("SERVER_ERROR {message}\r\n").into_bytes(),
40 MsgType::RspMcClientError => format!("CLIENT_ERROR {message}\r\n").into_bytes(),
41 MsgType::RspMcError => b"ERROR\r\n".to_vec(),
42 _ => Vec::new(),
46 }
47}
48
49fn attach_payload(rsp: &mut Msg, pool: &MbufPool, bytes: &[u8]) {
55 let mut written = 0usize;
56 while written < bytes.len() {
57 let mut buf = pool.get();
58 let n = buf.recv(&bytes[written..]);
59 debug_assert!(
60 n > 0,
61 "MbufPool returned a buffer with zero writable capacity"
62 );
63 rsp.mbufs_mut().push_back(buf);
64 written += n;
65 }
66 rsp.recompute_mlen();
67}
68
69#[must_use]
123pub fn make_error(
124 req: &Msg,
125 err_type: MsgType,
126 error_code: i32,
127 dyn_error_code: DynErrorCode,
128 pool: &MbufPool,
129) -> Msg {
130 debug_assert!(
131 matches!(
132 err_type,
133 MsgType::RspRedisError
134 | MsgType::RspRedisErrorErr
135 | MsgType::RspRedisErrorOom
136 | MsgType::RspRedisErrorBusy
137 | MsgType::RspRedisErrorNoauth
138 | MsgType::RspRedisErrorLoading
139 | MsgType::RspRedisErrorBusykey
140 | MsgType::RspRedisErrorMisconf
141 | MsgType::RspRedisErrorNoscript
142 | MsgType::RspRedisErrorReadonly
143 | MsgType::RspRedisErrorWrongtype
144 | MsgType::RspRedisErrorExecabort
145 | MsgType::RspRedisErrorMasterdown
146 | MsgType::RspRedisErrorNoreplicas
147 | MsgType::RspMcServerError
148 | MsgType::RspMcClientError
149 | MsgType::RspMcError
150 ),
151 "make_error called with non-error MsgType {err_type:?}"
152 );
153 let mut rsp = Msg::new(req.id(), err_type, false);
154 rsp.set_parent_id(req.id());
155 rsp.set_is_error(true);
156 rsp.set_error_code(error_code);
157 rsp.set_dyn_error_code(dyn_error_code);
158 let wire = render_error_wire(err_type, dyn_error_code.message());
159 if !wire.is_empty() {
160 attach_payload(&mut rsp, pool, &wire);
161 }
162 rsp
163}
164
165#[must_use]
189pub fn make_simple_redis(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
190 let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
191 rsp.set_parent_id(req.id());
192 let mut buf = pool.get();
193 buf.recv(payload);
194 rsp.mbufs_mut().push_back(buf);
195 rsp.recompute_mlen();
196 rsp
197}
198
199pub fn link(req: &mut Msg, rsp: &Msg) -> Option<crate::core::types::MsgId> {
217 let prev = req.selected_rsp();
218 req.set_selected_rsp(Some(rsp.id()));
219 prev
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use crate::msg::MsgType;
226
227 fn wire_bytes(msg: &Msg) -> Vec<u8> {
228 msg.mbufs()
229 .iter()
230 .flat_map(|b| b.readable().to_vec())
231 .collect()
232 }
233
234 #[test]
235 fn error_response_inherits_request_id() {
236 let pool = MbufPool::default();
237 let req = Msg::new(42, MsgType::ReqRedisGet, true);
238 let rsp = make_error(
239 &req,
240 MsgType::RspRedisError,
241 13,
242 DynErrorCode::DynomiteUnknownError,
243 &pool,
244 );
245 assert_eq!(rsp.id(), 42);
246 assert_eq!(rsp.parent_id(), 42);
247 assert!(rsp.flags().is_error);
248 assert_eq!(rsp.error_code(), 13);
249 }
250
251 #[test]
252 fn make_error_redis_renders_dynomite_prefix() {
253 let pool = MbufPool::default();
254 let req = Msg::new(1, MsgType::ReqRedisGet, true);
255 let rsp = make_error(
256 &req,
257 MsgType::RspRedisError,
258 0,
259 DynErrorCode::DynomiteNoQuorumAchieved,
260 &pool,
261 );
262 assert_eq!(
263 wire_bytes(&rsp),
264 b"-Dynomite: Failed to achieve Quorum\r\n".to_vec()
265 );
266 assert_eq!(rsp.mlen() as usize, wire_bytes(&rsp).len());
267 }
268
269 #[test]
270 fn make_error_typed_redis_variants_render_correct_prefixes() {
271 let pool = MbufPool::default();
272 let req = Msg::new(1, MsgType::ReqRedisGet, true);
273 let dyn_err = DynErrorCode::DynomiteUnknownError;
274 let cases: &[(MsgType, &[u8])] = &[
275 (MsgType::RspRedisError, b"-Dynomite: Unknown Error\r\n"),
276 (MsgType::RspRedisErrorErr, b"-ERR Unknown Error\r\n"),
277 (MsgType::RspRedisErrorOom, b"-OOM Unknown Error\r\n"),
278 (MsgType::RspRedisErrorBusy, b"-BUSY Unknown Error\r\n"),
279 (MsgType::RspRedisErrorNoauth, b"-NOAUTH Unknown Error\r\n"),
280 (MsgType::RspRedisErrorLoading, b"-LOADING Unknown Error\r\n"),
281 (MsgType::RspRedisErrorBusykey, b"-BUSYKEY Unknown Error\r\n"),
282 (MsgType::RspRedisErrorMisconf, b"-MISCONF Unknown Error\r\n"),
283 (
284 MsgType::RspRedisErrorNoscript,
285 b"-NOSCRIPT Unknown Error\r\n",
286 ),
287 (
288 MsgType::RspRedisErrorReadonly,
289 b"-READONLY Unknown Error\r\n",
290 ),
291 ];
292 for (ty, expected) in cases {
293 let rsp = make_error(&req, *ty, 0, dyn_err, &pool);
294 assert_eq!(
295 wire_bytes(&rsp),
296 expected.to_vec(),
297 "wire mismatch for {ty:?}"
298 );
299 }
300 }
301
302 #[test]
303 fn make_error_memcache_renders_server_error() {
304 let pool = MbufPool::default();
305 let req = Msg::new(1, MsgType::ReqMcGet, true);
306 let rsp = make_error(
307 &req,
308 MsgType::RspMcServerError,
309 0,
310 DynErrorCode::DynomiteNoQuorumAchieved,
311 &pool,
312 );
313 assert_eq!(
314 wire_bytes(&rsp),
315 b"SERVER_ERROR Failed to achieve Quorum\r\n".to_vec()
316 );
317 }
318
319 #[test]
320 fn make_error_memcache_error_bare() {
321 let pool = MbufPool::default();
322 let req = Msg::new(1, MsgType::ReqMcGet, true);
323 let rsp = make_error(
324 &req,
325 MsgType::RspMcError,
326 0,
327 DynErrorCode::DynomiteUnknownError,
328 &pool,
329 );
330 assert_eq!(wire_bytes(&rsp), b"ERROR\r\n".to_vec());
331 assert_eq!(rsp.mlen() as usize, wire_bytes(&rsp).len());
332 }
333
334 #[test]
335 fn make_error_no_quorum_message_matches_dispatcher_log() {
336 let pool = MbufPool::default();
340 let req = Msg::new(1, MsgType::ReqRedisGet, true);
341 let rsp = make_error(
342 &req,
343 MsgType::RspRedisError,
344 0,
345 DynErrorCode::DynomiteNoQuorumAchieved,
346 &pool,
347 );
348 let bytes = wire_bytes(&rsp);
349 assert!(bytes.starts_with(b"-Dynomite: "));
350 assert!(bytes.ends_with(b"\r\n"));
351 }
352
353 #[test]
354 fn link_returns_previous() {
355 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
356 let rsp1 = Msg::new(2, MsgType::RspRedisStatus, false);
357 let rsp2 = Msg::new(3, MsgType::RspRedisStatus, false);
358 assert!(link(&mut req, &rsp1).is_none());
359 assert_eq!(link(&mut req, &rsp2), Some(2));
360 assert_eq!(req.selected_rsp(), Some(3));
361 }
362}