Skip to main content

dynomite/msg/
response.rs

1//! Response lifecycle helpers.
2//!
3//! A small number of pure-data helpers - constructing error
4//! responses, pairing a response with its request - live here. The
5//! connection plumbing (response send / done, queue threading)
6//! ships in Stage 9.
7
8use super::message::Msg;
9use super::msg_type::MsgType;
10use super::DynErrorCode;
11use crate::io::mbuf::MbufPool;
12
13/// Render the on-the-wire error payload for `err_type` with the
14/// human-readable string supplied by `dyn_error_code`.
15///
16/// The Redis side mirrors RESP error replies: `RspRedisError` uses
17/// the synthetic Dynomite prefix, every typed `RspRedisError*`
18/// variant uses its matching wire token, and the unspecified ones
19/// fall back to `-ERR <message>\r\n`. The Memcache side uses
20/// `SERVER_ERROR <message>\r\n` for `RspMcServerError`,
21/// `CLIENT_ERROR <message>\r\n` for `RspMcClientError`, and the
22/// bare `ERROR\r\n` for `RspMcError`.
23fn render_error_wire(err_type: MsgType, message: &str) -> Vec<u8> {
24    match err_type {
25        MsgType::RspRedisError => format!("-Dynomite: {message}\r\n").into_bytes(),
26        MsgType::RspRedisErrorErr => format!("-ERR {message}\r\n").into_bytes(),
27        MsgType::RspRedisErrorOom => format!("-OOM {message}\r\n").into_bytes(),
28        MsgType::RspRedisErrorBusy => format!("-BUSY {message}\r\n").into_bytes(),
29        MsgType::RspRedisErrorNoauth => format!("-NOAUTH {message}\r\n").into_bytes(),
30        MsgType::RspRedisErrorLoading => format!("-LOADING {message}\r\n").into_bytes(),
31        MsgType::RspRedisErrorBusykey => format!("-BUSYKEY {message}\r\n").into_bytes(),
32        MsgType::RspRedisErrorMisconf => format!("-MISCONF {message}\r\n").into_bytes(),
33        MsgType::RspRedisErrorNoscript => format!("-NOSCRIPT {message}\r\n").into_bytes(),
34        MsgType::RspRedisErrorReadonly => format!("-READONLY {message}\r\n").into_bytes(),
35        MsgType::RspRedisErrorWrongtype => format!("-WRONGTYPE {message}\r\n").into_bytes(),
36        MsgType::RspRedisErrorExecabort => format!("-EXECABORT {message}\r\n").into_bytes(),
37        MsgType::RspRedisErrorMasterdown => format!("-MASTERDOWN {message}\r\n").into_bytes(),
38        MsgType::RspRedisErrorNoreplicas => format!("-NOREPLICAS {message}\r\n").into_bytes(),
39        MsgType::RspMcServerError => format!("SERVER_ERROR {message}\r\n").into_bytes(),
40        MsgType::RspMcClientError => format!("CLIENT_ERROR {message}\r\n").into_bytes(),
41        MsgType::RspMcError => b"ERROR\r\n".to_vec(),
42        // Non-error variants are rejected by the debug_assert in
43        // `make_error`. Returning an empty payload here keeps the
44        // function total without papering over the contract.
45        _ => Vec::new(),
46    }
47}
48
49/// Append `bytes` to `rsp` as one or more mbufs drawn from `pool`.
50///
51/// All current call sites produce error payloads that are well under
52/// any sane chunk size, but the loop keeps the helper correct for
53/// future growth (longer messages, accumulated framing).
54fn attach_payload(rsp: &mut Msg, pool: &MbufPool, bytes: &[u8]) {
55    let mut written = 0usize;
56    while written < bytes.len() {
57        let mut buf = pool.get();
58        let n = buf.recv(&bytes[written..]);
59        debug_assert!(
60            n > 0,
61            "MbufPool returned a buffer with zero writable capacity"
62        );
63        rsp.mbufs_mut().push_back(buf);
64        written += n;
65    }
66    rsp.recompute_mlen();
67}
68
69/// Build a synthetic error response for `req`.
70///
71/// The constructed message inherits the request's id (so the
72/// dispatcher can pair them), sets `is_request` to false, marks the
73/// response as in-error, stamps the error codes, and attaches the
74/// rendered wire-format error string in one or more mbufs taken
75/// from `pool` so the client driver actually has bytes to write
76/// back.
77///
78/// The wire formats produced are:
79///
80/// * Redis (RESP):
81///   * `RspRedisError`: `-Dynomite: <message>\r\n`
82///   * `RspRedisErrorErr`: `-ERR <message>\r\n`
83///   * `RspRedisErrorOom`: `-OOM <message>\r\n`
84///   * `RspRedisErrorBusy`: `-BUSY <message>\r\n`
85///   * `RspRedisErrorNoauth`: `-NOAUTH <message>\r\n`
86///   * `RspRedisErrorLoading`: `-LOADING <message>\r\n`
87///   * `RspRedisErrorBusykey`: `-BUSYKEY <message>\r\n`
88///   * `RspRedisErrorMisconf`: `-MISCONF <message>\r\n`
89///   * `RspRedisErrorNoscript`: `-NOSCRIPT <message>\r\n`
90///   * `RspRedisErrorReadonly`: `-READONLY <message>\r\n`
91///   * `RspRedisErrorWrongtype`: `-WRONGTYPE <message>\r\n`
92///   * `RspRedisErrorExecabort`: `-EXECABORT <message>\r\n`
93///   * `RspRedisErrorMasterdown`: `-MASTERDOWN <message>\r\n`
94///   * `RspRedisErrorNoreplicas`: `-NOREPLICAS <message>\r\n`
95/// * Memcache (text):
96///   * `RspMcServerError`: `SERVER_ERROR <message>\r\n`
97///   * `RspMcClientError`: `CLIENT_ERROR <message>\r\n`
98///   * `RspMcError`: `ERROR\r\n`
99///
100/// `<message>` is [`DynErrorCode::message`].
101///
102/// # Examples
103///
104/// ```
105/// use dynomite::io::mbuf::MbufPool;
106/// use dynomite::msg::{response, DynErrorCode, Msg, MsgType};
107///
108/// let req = Msg::new(7, MsgType::ReqRedisGet, true);
109/// let pool = MbufPool::default();
110/// let rsp = response::make_error(
111///     &req,
112///     MsgType::RspRedisError,
113///     13,
114///     DynErrorCode::PeerHostDown,
115///     &pool,
116/// );
117/// assert_eq!(rsp.parent_id(), 7);
118/// assert!(rsp.flags().is_error);
119/// let bytes: Vec<u8> = rsp.mbufs().iter().flat_map(|b| b.readable().to_vec()).collect();
120/// assert_eq!(bytes, b"-Dynomite: Peer Node is down\r\n".to_vec());
121/// ```
122#[must_use]
123pub fn make_error(
124    req: &Msg,
125    err_type: MsgType,
126    error_code: i32,
127    dyn_error_code: DynErrorCode,
128    pool: &MbufPool,
129) -> Msg {
130    debug_assert!(
131        matches!(
132            err_type,
133            MsgType::RspRedisError
134                | MsgType::RspRedisErrorErr
135                | MsgType::RspRedisErrorOom
136                | MsgType::RspRedisErrorBusy
137                | MsgType::RspRedisErrorNoauth
138                | MsgType::RspRedisErrorLoading
139                | MsgType::RspRedisErrorBusykey
140                | MsgType::RspRedisErrorMisconf
141                | MsgType::RspRedisErrorNoscript
142                | MsgType::RspRedisErrorReadonly
143                | MsgType::RspRedisErrorWrongtype
144                | MsgType::RspRedisErrorExecabort
145                | MsgType::RspRedisErrorMasterdown
146                | MsgType::RspRedisErrorNoreplicas
147                | MsgType::RspMcServerError
148                | MsgType::RspMcClientError
149                | MsgType::RspMcError
150        ),
151        "make_error called with non-error MsgType {err_type:?}"
152    );
153    let mut rsp = Msg::new(req.id(), err_type, false);
154    rsp.set_parent_id(req.id());
155    rsp.set_is_error(true);
156    rsp.set_error_code(error_code);
157    rsp.set_dyn_error_code(dyn_error_code);
158    let wire = render_error_wire(err_type, dyn_error_code.message());
159    if !wire.is_empty() {
160        attach_payload(&mut rsp, pool, &wire);
161    }
162    rsp
163}
164
165/// Build a synthetic Redis-status response carrying `payload` as
166/// the on-the-wire reply bytes.
167///
168/// The constructed message inherits the request's id (so the
169/// dispatcher can pair them), sets `is_request` to false, marks
170/// the type as [`MsgType::RspRedisStatus`], and attaches a single
171/// mbuf containing `payload` (verbatim, no encoding). Use this
172/// for synthesized replies whose wire form is fixed (`+OK\r\n`,
173/// `+PONG\r\n`, ...).
174///
175/// # Examples
176///
177/// ```
178/// use dynomite::io::mbuf::MbufPool;
179/// use dynomite::msg::{response, Msg, MsgType};
180///
181/// let req = Msg::new(7, MsgType::ReqRedisQuit, true);
182/// let pool = MbufPool::default();
183/// let rsp = response::make_simple_redis(&req, &pool, b"+OK\r\n");
184/// assert_eq!(rsp.id(), 7);
185/// assert_eq!(rsp.ty(), MsgType::RspRedisStatus);
186/// assert_eq!(rsp.mlen(), 5);
187/// ```
188#[must_use]
189pub fn make_simple_redis(req: &Msg, pool: &MbufPool, payload: &[u8]) -> Msg {
190    let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
191    rsp.set_parent_id(req.id());
192    let mut buf = pool.get();
193    buf.recv(payload);
194    rsp.mbufs_mut().push_back(buf);
195    rsp.recompute_mlen();
196    rsp
197}
198
199/// Pair a response with its request: stamps the response's parent
200/// id and sets the request's `selected_rsp` to the response id.
201///
202/// Returns the previous selected-response id, if any, so callers can
203/// release the now-stale response.
204///
205/// # Examples
206///
207/// ```
208/// use dynomite::msg::{response, Msg, MsgType};
209///
210/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
211/// let rsp = Msg::new(2, MsgType::RspRedisStatus, false);
212/// let prev = response::link(&mut req, &rsp);
213/// assert!(prev.is_none());
214/// assert_eq!(req.selected_rsp(), Some(2));
215/// ```
216pub fn link(req: &mut Msg, rsp: &Msg) -> Option<crate::core::types::MsgId> {
217    let prev = req.selected_rsp();
218    req.set_selected_rsp(Some(rsp.id()));
219    prev
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use crate::msg::MsgType;
226
227    fn wire_bytes(msg: &Msg) -> Vec<u8> {
228        msg.mbufs()
229            .iter()
230            .flat_map(|b| b.readable().to_vec())
231            .collect()
232    }
233
234    #[test]
235    fn error_response_inherits_request_id() {
236        let pool = MbufPool::default();
237        let req = Msg::new(42, MsgType::ReqRedisGet, true);
238        let rsp = make_error(
239            &req,
240            MsgType::RspRedisError,
241            13,
242            DynErrorCode::DynomiteUnknownError,
243            &pool,
244        );
245        assert_eq!(rsp.id(), 42);
246        assert_eq!(rsp.parent_id(), 42);
247        assert!(rsp.flags().is_error);
248        assert_eq!(rsp.error_code(), 13);
249    }
250
251    #[test]
252    fn make_error_redis_renders_dynomite_prefix() {
253        let pool = MbufPool::default();
254        let req = Msg::new(1, MsgType::ReqRedisGet, true);
255        let rsp = make_error(
256            &req,
257            MsgType::RspRedisError,
258            0,
259            DynErrorCode::DynomiteNoQuorumAchieved,
260            &pool,
261        );
262        assert_eq!(
263            wire_bytes(&rsp),
264            b"-Dynomite: Failed to achieve Quorum\r\n".to_vec()
265        );
266        assert_eq!(rsp.mlen() as usize, wire_bytes(&rsp).len());
267    }
268
269    #[test]
270    fn make_error_typed_redis_variants_render_correct_prefixes() {
271        let pool = MbufPool::default();
272        let req = Msg::new(1, MsgType::ReqRedisGet, true);
273        let dyn_err = DynErrorCode::DynomiteUnknownError;
274        let cases: &[(MsgType, &[u8])] = &[
275            (MsgType::RspRedisError, b"-Dynomite: Unknown Error\r\n"),
276            (MsgType::RspRedisErrorErr, b"-ERR Unknown Error\r\n"),
277            (MsgType::RspRedisErrorOom, b"-OOM Unknown Error\r\n"),
278            (MsgType::RspRedisErrorBusy, b"-BUSY Unknown Error\r\n"),
279            (MsgType::RspRedisErrorNoauth, b"-NOAUTH Unknown Error\r\n"),
280            (MsgType::RspRedisErrorLoading, b"-LOADING Unknown Error\r\n"),
281            (MsgType::RspRedisErrorBusykey, b"-BUSYKEY Unknown Error\r\n"),
282            (MsgType::RspRedisErrorMisconf, b"-MISCONF Unknown Error\r\n"),
283            (
284                MsgType::RspRedisErrorNoscript,
285                b"-NOSCRIPT Unknown Error\r\n",
286            ),
287            (
288                MsgType::RspRedisErrorReadonly,
289                b"-READONLY Unknown Error\r\n",
290            ),
291        ];
292        for (ty, expected) in cases {
293            let rsp = make_error(&req, *ty, 0, dyn_err, &pool);
294            assert_eq!(
295                wire_bytes(&rsp),
296                expected.to_vec(),
297                "wire mismatch for {ty:?}"
298            );
299        }
300    }
301
302    #[test]
303    fn make_error_memcache_renders_server_error() {
304        let pool = MbufPool::default();
305        let req = Msg::new(1, MsgType::ReqMcGet, true);
306        let rsp = make_error(
307            &req,
308            MsgType::RspMcServerError,
309            0,
310            DynErrorCode::DynomiteNoQuorumAchieved,
311            &pool,
312        );
313        assert_eq!(
314            wire_bytes(&rsp),
315            b"SERVER_ERROR Failed to achieve Quorum\r\n".to_vec()
316        );
317    }
318
319    #[test]
320    fn make_error_memcache_error_bare() {
321        let pool = MbufPool::default();
322        let req = Msg::new(1, MsgType::ReqMcGet, true);
323        let rsp = make_error(
324            &req,
325            MsgType::RspMcError,
326            0,
327            DynErrorCode::DynomiteUnknownError,
328            &pool,
329        );
330        assert_eq!(wire_bytes(&rsp), b"ERROR\r\n".to_vec());
331        assert_eq!(rsp.mlen() as usize, wire_bytes(&rsp).len());
332    }
333
334    #[test]
335    fn make_error_no_quorum_message_matches_dispatcher_log() {
336        // Dispatcher's `NoTargets` path emits a
337        // `DynomiteNoQuorumAchieved` envelope; the wire form must
338        // surface a human-readable reason rather than a hang.
339        let pool = MbufPool::default();
340        let req = Msg::new(1, MsgType::ReqRedisGet, true);
341        let rsp = make_error(
342            &req,
343            MsgType::RspRedisError,
344            0,
345            DynErrorCode::DynomiteNoQuorumAchieved,
346            &pool,
347        );
348        let bytes = wire_bytes(&rsp);
349        assert!(bytes.starts_with(b"-Dynomite: "));
350        assert!(bytes.ends_with(b"\r\n"));
351    }
352
353    #[test]
354    fn link_returns_previous() {
355        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
356        let rsp1 = Msg::new(2, MsgType::RspRedisStatus, false);
357        let rsp2 = Msg::new(3, MsgType::RspRedisStatus, false);
358        assert!(link(&mut req, &rsp1).is_none());
359        assert_eq!(link(&mut req, &rsp2), Some(2));
360        assert_eq!(req.selected_rsp(), Some(3));
361    }
362}