use super::message::Msg;
use super::msg_type::MsgType;
use super::DynErrorCode;
use crate::io::mbuf::MbufPool;
fn render_error_wire(err_type: MsgType, message: &str) -> Vec<u8> {
match err_type {
MsgType::RspRedisError => format!("-Dynomite: {message}\r\n").into_bytes(),
MsgType::RspRedisErrorErr => format!("-ERR {message}\r\n").into_bytes(),
MsgType::RspRedisErrorOom => format!("-OOM {message}\r\n").into_bytes(),
MsgType::RspRedisErrorBusy => format!("-BUSY {message}\r\n").into_bytes(),
MsgType::RspRedisErrorNoauth => format!("-NOAUTH {message}\r\n").into_bytes(),
MsgType::RspRedisErrorLoading => format!("-LOADING {message}\r\n").into_bytes(),
MsgType::RspRedisErrorBusykey => format!("-BUSYKEY {message}\r\n").into_bytes(),
MsgType::RspRedisErrorMisconf => format!("-MISCONF {message}\r\n").into_bytes(),
MsgType::RspRedisErrorNoscript => format!("-NOSCRIPT {message}\r\n").into_bytes(),
MsgType::RspRedisErrorReadonly => format!("-READONLY {message}\r\n").into_bytes(),
MsgType::RspRedisErrorWrongtype => format!("-WRONGTYPE {message}\r\n").into_bytes(),
MsgType::RspRedisErrorExecabort => format!("-EXECABORT {message}\r\n").into_bytes(),
MsgType::RspRedisErrorMasterdown => format!("-MASTERDOWN {message}\r\n").into_bytes(),
MsgType::RspRedisErrorNoreplicas => format!("-NOREPLICAS {message}\r\n").into_bytes(),
MsgType::RspMcServerError => format!("SERVER_ERROR {message}\r\n").into_bytes(),
MsgType::RspMcClientError => format!("CLIENT_ERROR {message}\r\n").into_bytes(),
MsgType::RspMcError => b"ERROR\r\n".to_vec(),
_ => Vec::new(),
}
}
fn attach_payload(rsp: &mut Msg, pool: &MbufPool, bytes: &[u8]) {
let mut written = 0usize;
while written < bytes.len() {
let mut buf = pool.get();
let n = buf.recv(&bytes[written..]);
debug_assert!(
n > 0,
"MbufPool returned a buffer with zero writable capacity"
);
rsp.mbufs_mut().push_back(buf);
written += n;
}
rsp.recompute_mlen();
}
#[must_use]
pub fn make_error(
req: &Msg,
err_type: MsgType,
error_code: i32,
dyn_error_code: DynErrorCode,
pool: &MbufPool,
) -> Msg {
debug_assert!(
matches!(
err_type,
MsgType::RspRedisError
| MsgType::RspRedisErrorErr
| MsgType::RspRedisErrorOom
| MsgType::RspRedisErrorBusy
| MsgType::RspRedisErrorNoauth
| MsgType::RspRedisErrorLoading
| MsgType::RspRedisErrorBusykey
| MsgType::RspRedisErrorMisconf
| MsgType::RspRedisErrorNoscript
| MsgType::RspRedisErrorReadonly
| MsgType::RspRedisErrorWrongtype
| MsgType::RspRedisErrorExecabort
| MsgType::RspRedisErrorMasterdown
| MsgType::RspRedisErrorNoreplicas
| MsgType::RspMcServerError
| MsgType::RspMcClientError
| MsgType::RspMcError
),
"make_error called with non-error MsgType {err_type:?}"
);
let mut rsp = Msg::new(req.id(), err_type, false);
rsp.set_parent_id(req.id());
rsp.set_is_error(true);
rsp.set_error_code(error_code);
rsp.set_dyn_error_code(dyn_error_code);
let wire = render_error_wire(err_type, dyn_error_code.message());
if !wire.is_empty() {
attach_payload(&mut rsp, pool, &wire);
}
rsp
}
#[must_use]
pub fn make_simple_redis(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
rsp.set_parent_id(req.id());
let mut buf = pool.get();
buf.recv(payload);
rsp.mbufs_mut().push_back(buf);
rsp.recompute_mlen();
rsp
}
pub fn link(req: &mut Msg, rsp: &Msg) -> Option<crate::core::types::MsgId> {
let prev = req.selected_rsp();
req.set_selected_rsp(Some(rsp.id()));
prev
}
#[cfg(test)]
mod tests {
use super::*;
use crate::msg::MsgType;
fn wire_bytes(msg: &Msg) -> Vec<u8> {
msg.mbufs()
.iter()
.flat_map(|b| b.readable().to_vec())
.collect()
}
#[test]
fn error_response_inherits_request_id() {
let pool = MbufPool::default();
let req = Msg::new(42, MsgType::ReqRedisGet, true);
let rsp = make_error(
&req,
MsgType::RspRedisError,
13,
DynErrorCode::DynomiteUnknownError,
&pool,
);
assert_eq!(rsp.id(), 42);
assert_eq!(rsp.parent_id(), 42);
assert!(rsp.flags().is_error);
assert_eq!(rsp.error_code(), 13);
}
#[test]
fn make_error_redis_renders_dynomite_prefix() {
let pool = MbufPool::default();
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let rsp = make_error(
&req,
MsgType::RspRedisError,
0,
DynErrorCode::DynomiteNoQuorumAchieved,
&pool,
);
assert_eq!(
wire_bytes(&rsp),
b"-Dynomite: Failed to achieve Quorum\r\n".to_vec()
);
assert_eq!(rsp.mlen() as usize, wire_bytes(&rsp).len());
}
#[test]
fn make_error_typed_redis_variants_render_correct_prefixes() {
let pool = MbufPool::default();
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let dyn_err = DynErrorCode::DynomiteUnknownError;
let cases: &[(MsgType, &[u8])] = &[
(MsgType::RspRedisError, b"-Dynomite: Unknown Error\r\n"),
(MsgType::RspRedisErrorErr, b"-ERR Unknown Error\r\n"),
(MsgType::RspRedisErrorOom, b"-OOM Unknown Error\r\n"),
(MsgType::RspRedisErrorBusy, b"-BUSY Unknown Error\r\n"),
(MsgType::RspRedisErrorNoauth, b"-NOAUTH Unknown Error\r\n"),
(MsgType::RspRedisErrorLoading, b"-LOADING Unknown Error\r\n"),
(MsgType::RspRedisErrorBusykey, b"-BUSYKEY Unknown Error\r\n"),
(MsgType::RspRedisErrorMisconf, b"-MISCONF Unknown Error\r\n"),
(
MsgType::RspRedisErrorNoscript,
b"-NOSCRIPT Unknown Error\r\n",
),
(
MsgType::RspRedisErrorReadonly,
b"-READONLY Unknown Error\r\n",
),
];
for (ty, expected) in cases {
let rsp = make_error(&req, *ty, 0, dyn_err, &pool);
assert_eq!(
wire_bytes(&rsp),
expected.to_vec(),
"wire mismatch for {ty:?}"
);
}
}
#[test]
fn make_error_memcache_renders_server_error() {
let pool = MbufPool::default();
let req = Msg::new(1, MsgType::ReqMcGet, true);
let rsp = make_error(
&req,
MsgType::RspMcServerError,
0,
DynErrorCode::DynomiteNoQuorumAchieved,
&pool,
);
assert_eq!(
wire_bytes(&rsp),
b"SERVER_ERROR Failed to achieve Quorum\r\n".to_vec()
);
}
#[test]
fn make_error_memcache_error_bare() {
let pool = MbufPool::default();
let req = Msg::new(1, MsgType::ReqMcGet, true);
let rsp = make_error(
&req,
MsgType::RspMcError,
0,
DynErrorCode::DynomiteUnknownError,
&pool,
);
assert_eq!(wire_bytes(&rsp), b"ERROR\r\n".to_vec());
assert_eq!(rsp.mlen() as usize, wire_bytes(&rsp).len());
}
#[test]
fn make_error_no_quorum_message_matches_dispatcher_log() {
let pool = MbufPool::default();
let req = Msg::new(1, MsgType::ReqRedisGet, true);
let rsp = make_error(
&req,
MsgType::RspRedisError,
0,
DynErrorCode::DynomiteNoQuorumAchieved,
&pool,
);
let bytes = wire_bytes(&rsp);
assert!(bytes.starts_with(b"-Dynomite: "));
assert!(bytes.ends_with(b"\r\n"));
}
#[test]
fn link_returns_previous() {
let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
let rsp1 = Msg::new(2, MsgType::RspRedisStatus, false);
let rsp2 = Msg::new(3, MsgType::RspRedisStatus, false);
assert!(link(&mut req, &rsp1).is_none());
assert_eq!(link(&mut req, &rsp2), Some(2));
assert_eq!(req.selected_rsp(), Some(3));
}
}