dynomite/msg/request.rs
1//! Request lifecycle helpers.
2//!
3//! Request handling splits into two layers: request-side data
4//! manipulation (which fragments are done, which are in error,
5//! what error to send) and connection-side plumbing (timeout
6//! queues, recv/send done callbacks, peer forwarding).
7//!
8//! This module owns the data-side helpers; the connection-side
9//! helpers land in Stage 9 once the connection FSM exists.
10
11use crate::core::types::MsgId;
12
13use super::message::Msg;
14use super::queue::MsgQueue;
15
16/// Mark `req` as in-error with `error_code` (a libc errno-shaped
17/// value) and the matching `dyn_error_code`. The flag is set so the
18/// response path can synthesise an error reply on the next pass.
19///
20/// Returns `true` when the message transitions from healthy to
21/// error; subsequent calls are no-ops.
22///
23/// # Examples
24///
25/// ```
26/// use dynomite::msg::{request, DynErrorCode, Msg, MsgType};
27///
28/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
29/// assert!(request::set_error(&mut req, 13, DynErrorCode::PeerHostDown));
30/// assert!(req.flags().is_error);
31/// assert!(!request::set_error(&mut req, 13, DynErrorCode::PeerHostDown));
32/// ```
33pub fn set_error(req: &mut Msg, error_code: i32, dyn_error_code: super::DynErrorCode) -> bool {
34 if req.flags().is_error {
35 return false;
36 }
37 req.set_is_error(true);
38 req.flags_mut().done = true;
39 req.set_error_code(error_code);
40 req.set_dyn_error_code(dyn_error_code);
41 true
42}
43
44/// True when the request has been resolved end-to-end: a response
45/// has been selected, all fragments are accounted for, and (for
46/// fragment vectors) the parent fragment has finished aggregating.
47///
48/// The connection-coupled propagation pass that walks the client
49/// queue and marks every sibling fragment lands in Stage 9; this
50/// helper returns the data-shape answer for a single request.
51///
52/// # Examples
53///
54/// ```
55/// use dynomite::msg::{request, Msg, MsgType};
56///
57/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
58/// assert!(!request::is_done(&req));
59/// req.set_selected_rsp(Some(2));
60/// assert!(request::is_done(&req));
61/// ```
62#[must_use]
63pub fn is_done(req: &Msg) -> bool {
64 if req.selected_rsp().is_none() {
65 return false;
66 }
67 if req.fragment_ids().is_empty() {
68 return true;
69 }
70 req.flags().fdone
71}
72
73/// True when the request is in error: either marked directly or
74/// flagged through fragment-error propagation.
75///
76/// # Examples
77///
78/// ```
79/// use dynomite::msg::{request, DynErrorCode, Msg, MsgType};
80///
81/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
82/// assert!(!request::is_error(&req));
83/// request::set_error(&mut req, 13, DynErrorCode::PeerHostDown);
84/// assert!(request::is_error(&req));
85/// ```
86#[must_use]
87pub fn is_error(req: &Msg) -> bool {
88 req.flags().is_error || req.flags().is_ferror
89}
90
91/// Drain `from` of every request whose `selected_rsp` matches `id`
92/// and forward them to `to`.
93///
94/// This is the data-shape building block the connection-level
95/// `req_send_next` / `req_send_done` functions consume; it lets
96/// tests exercise the sibling-walk without standing up the full
97/// connection FSM.
98///
99/// # Examples
100///
101/// ```
102/// use dynomite::msg::{request, Msg, MsgQueue, MsgType};
103///
104/// let mut a = MsgQueue::new();
105/// let mut b = MsgQueue::new();
106/// let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
107/// m.set_selected_rsp(Some(99));
108/// a.push_back(m);
109/// request::move_completed(&mut a, &mut b, 99);
110/// assert!(a.is_empty());
111/// assert_eq!(b.len(), 1);
112/// ```
113pub fn move_completed(from: &mut MsgQueue, to: &mut MsgQueue, id: MsgId) {
114 let mut keep = MsgQueue::new();
115 while let Some(msg) = from.pop_front() {
116 if msg.selected_rsp() == Some(id) {
117 to.push_back(msg);
118 } else {
119 keep.push_back(msg);
120 }
121 }
122 while let Some(msg) = keep.pop_front() {
123 from.push_back(msg);
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use crate::msg::{DynErrorCode, MsgType};
131
132 #[test]
133 fn error_flags_propagate() {
134 let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
135 assert!(set_error(&mut req, 7, DynErrorCode::PeerHostDown));
136 assert!(req.flags().is_error);
137 assert!(req.flags().done);
138 assert_eq!(req.error_code(), 7);
139 assert_eq!(req.dyn_error_code(), DynErrorCode::PeerHostDown);
140 assert!(is_error(&req));
141 assert!(!set_error(&mut req, 9, DynErrorCode::DynomiteUnknownError));
142 assert_eq!(req.error_code(), 7);
143 }
144
145 #[test]
146 fn fragmented_request_done_requires_fdone() {
147 let mut req = Msg::new(1, MsgType::ReqRedisMget, true);
148 req.push_fragment_id(2);
149 req.set_selected_rsp(Some(99));
150 assert!(!is_done(&req));
151 req.flags_mut().fdone = true;
152 assert!(is_done(&req));
153 }
154}