Skip to main content

dynomite/msg/
request.rs

1//! Request lifecycle helpers.
2//!
3//! Request handling splits into two layers: request-side data
4//! manipulation (which fragments are done, which are in error,
5//! what error to send) and connection-side plumbing (timeout
6//! queues, recv/send done callbacks, peer forwarding).
7//!
8//! This module owns the data-side helpers; the connection-side
9//! helpers land in Stage 9 once the connection FSM exists.
10
11use crate::core::types::MsgId;
12
13use super::message::Msg;
14use super::queue::MsgQueue;
15
16/// Mark `req` as in-error with `error_code` (a libc errno-shaped
17/// value) and the matching `dyn_error_code`. The flag is set so the
18/// response path can synthesise an error reply on the next pass.
19///
20/// Returns `true` when the message transitions from healthy to
21/// error; subsequent calls are no-ops.
22///
23/// # Examples
24///
25/// ```
26/// use dynomite::msg::{request, DynErrorCode, Msg, MsgType};
27///
28/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
29/// assert!(request::set_error(&mut req, 13, DynErrorCode::PeerHostDown));
30/// assert!(req.flags().is_error);
31/// assert!(!request::set_error(&mut req, 13, DynErrorCode::PeerHostDown));
32/// ```
33pub fn set_error(req: &mut Msg, error_code: i32, dyn_error_code: super::DynErrorCode) -> bool {
34    if req.flags().is_error {
35        return false;
36    }
37    req.set_is_error(true);
38    req.flags_mut().done = true;
39    req.set_error_code(error_code);
40    req.set_dyn_error_code(dyn_error_code);
41    true
42}
43
44/// True when the request has been resolved end-to-end: a response
45/// has been selected, all fragments are accounted for, and (for
46/// fragment vectors) the parent fragment has finished aggregating.
47///
48/// The connection-coupled propagation pass that walks the client
49/// queue and marks every sibling fragment lands in Stage 9; this
50/// helper returns the data-shape answer for a single request.
51///
52/// # Examples
53///
54/// ```
55/// use dynomite::msg::{request, Msg, MsgType};
56///
57/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
58/// assert!(!request::is_done(&req));
59/// req.set_selected_rsp(Some(2));
60/// assert!(request::is_done(&req));
61/// ```
62#[must_use]
63pub fn is_done(req: &Msg) -> bool {
64    if req.selected_rsp().is_none() {
65        return false;
66    }
67    if req.fragment_ids().is_empty() {
68        return true;
69    }
70    req.flags().fdone
71}
72
73/// True when the request is in error: either marked directly or
74/// flagged through fragment-error propagation.
75///
76/// # Examples
77///
78/// ```
79/// use dynomite::msg::{request, DynErrorCode, Msg, MsgType};
80///
81/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
82/// assert!(!request::is_error(&req));
83/// request::set_error(&mut req, 13, DynErrorCode::PeerHostDown);
84/// assert!(request::is_error(&req));
85/// ```
86#[must_use]
87pub fn is_error(req: &Msg) -> bool {
88    req.flags().is_error || req.flags().is_ferror
89}
90
91/// Drain `from` of every request whose `selected_rsp` matches `id`
92/// and forward them to `to`.
93///
94/// This is the data-shape building block the connection-level
95/// `req_send_next` / `req_send_done` functions consume; it lets
96/// tests exercise the sibling-walk without standing up the full
97/// connection FSM.
98///
99/// # Examples
100///
101/// ```
102/// use dynomite::msg::{request, Msg, MsgQueue, MsgType};
103///
104/// let mut a = MsgQueue::new();
105/// let mut b = MsgQueue::new();
106/// let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
107/// m.set_selected_rsp(Some(99));
108/// a.push_back(m);
109/// request::move_completed(&mut a, &mut b, 99);
110/// assert!(a.is_empty());
111/// assert_eq!(b.len(), 1);
112/// ```
113pub fn move_completed(from: &mut MsgQueue, to: &mut MsgQueue, id: MsgId) {
114    let mut keep = MsgQueue::new();
115    while let Some(msg) = from.pop_front() {
116        if msg.selected_rsp() == Some(id) {
117            to.push_back(msg);
118        } else {
119            keep.push_back(msg);
120        }
121    }
122    while let Some(msg) = keep.pop_front() {
123        from.push_back(msg);
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use crate::msg::{DynErrorCode, MsgType};
131
132    #[test]
133    fn error_flags_propagate() {
134        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
135        assert!(set_error(&mut req, 7, DynErrorCode::PeerHostDown));
136        assert!(req.flags().is_error);
137        assert!(req.flags().done);
138        assert_eq!(req.error_code(), 7);
139        assert_eq!(req.dyn_error_code(), DynErrorCode::PeerHostDown);
140        assert!(is_error(&req));
141        assert!(!set_error(&mut req, 9, DynErrorCode::DynomiteUnknownError));
142        assert_eq!(req.error_code(), 7);
143    }
144
145    #[test]
146    fn fragmented_request_done_requires_fdone() {
147        let mut req = Msg::new(1, MsgType::ReqRedisMget, true);
148        req.push_fragment_id(2);
149        req.set_selected_rsp(Some(99));
150        assert!(!is_done(&req));
151        req.flags_mut().fdone = true;
152        assert!(is_done(&req));
153    }
154}