dynomite/msg/mod.rs
1//! Message model: the in-memory shape every request and response
2//! takes inside the engine, plus the queues, indices, and per-DC
3//! response managers that thread them through the dispatcher.
4//!
5//! This module exposes the data layer; the connection-coupled
6//! lifecycle (recv/send, timeout queues, peer forwarding) ships in
7//! Stage 9 once the connection state machine is in place. Helpers
8//! that already have a clean data-only definition (error response
9//! construction, fragment bookkeeping, quorum decisions) live here.
10//!
11//! # Examples
12//!
13//! ```
14//! use dynomite::msg::{Msg, MsgQueue, MsgType, ResponseMgr};
15//!
16//! let mut q = MsgQueue::new();
17//! q.push_back(Msg::new(1, MsgType::ReqRedisGet, true));
18//!
19//! let req = q.front().unwrap();
20//! let mgr = ResponseMgr::new(req, 1, None);
21//! assert_eq!(mgr.quorum_responses(), 1);
22//! ```
23
24use std::sync::OnceLock;
25
26pub mod index;
27pub mod keypos;
28pub mod message;
29pub mod msg_type;
30pub mod queue;
31pub mod request;
32pub mod response;
33pub mod response_mgr;
34
35pub use self::index::MsgIndex;
36pub use self::keypos::{ArgPos, KeyPos};
37pub use self::message::{ConnId, Msg, MsgFlags, MsgParseResult, MsgRouting};
38pub use self::msg_type::MsgType;
39pub use self::queue::MsgQueue;
40pub use self::response_mgr::{QuorumOutcome, ResponseMgr, MAX_REPLICAS_PER_DC};
41
42/// Cluster consistency level applied to a single message.
43///
44/// The numeric values match `consistency_t` in the C reference.
45#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
46#[repr(u8)]
47pub enum ConsistencyLevel {
48 /// Wait for one replica to ack.
49 #[default]
50 DcOne = 0,
51 /// Wait for a per-DC majority.
52 DcQuorum = 1,
53 /// `DcQuorum` with body checksum agreement; mismatches trigger
54 /// read repair.
55 DcSafeQuorum = 2,
56 /// `DcSafeQuorum` evaluated independently per datacenter.
57 DcEachSafeQuorum = 3,
58}
59
60impl ConsistencyLevel {
61 /// Stable string name.
62 ///
63 /// # Examples
64 ///
65 /// ```
66 /// use dynomite::msg::ConsistencyLevel;
67 /// assert_eq!(ConsistencyLevel::DcQuorum.name(), "DC_QUORUM");
68 /// ```
69 #[must_use]
70 pub fn name(self) -> &'static str {
71 match self {
72 ConsistencyLevel::DcOne => "DC_ONE",
73 ConsistencyLevel::DcQuorum => "DC_QUORUM",
74 ConsistencyLevel::DcSafeQuorum => "DC_SAFE_QUORUM",
75 ConsistencyLevel::DcEachSafeQuorum => "DC_EACH_SAFE_QUORUM",
76 }
77 }
78
79 /// Recover the level from its uppercase name.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use dynomite::msg::ConsistencyLevel;
85 /// assert_eq!(
86 /// ConsistencyLevel::from_name("DC_SAFE_QUORUM"),
87 /// Some(ConsistencyLevel::DcSafeQuorum),
88 /// );
89 /// ```
90 #[must_use]
91 pub fn from_name(name: &str) -> Option<Self> {
92 match name {
93 "DC_ONE" => Some(Self::DcOne),
94 "DC_QUORUM" => Some(Self::DcQuorum),
95 "DC_SAFE_QUORUM" => Some(Self::DcSafeQuorum),
96 "DC_EACH_SAFE_QUORUM" => Some(Self::DcEachSafeQuorum),
97 _ => None,
98 }
99 }
100}
101
102/// Dynomite-side error code carried in a message envelope.
103///
104/// Matches `dyn_error_t` from the C reference verbatim.
105#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
106#[repr(u8)]
107pub enum DynErrorCode {
108 /// No error.
109 #[default]
110 Ok = 0,
111 /// Unspecified error.
112 DynomiteUnknownError = 1,
113 /// Engine state forbids the request.
114 DynomiteInvalidState = 2,
115 /// Admin-only command issued in non-admin mode.
116 DynomiteInvalidAdminReq = 3,
117 /// Peer refused the connection.
118 PeerConnectionRefuse = 4,
119 /// Peer reachable but reported down.
120 PeerHostDown = 5,
121 /// Peer not yet connected.
122 PeerHostNotConnected = 6,
123 /// Datastore refused the connection.
124 StorageConnectionRefuse = 7,
125 /// Bad message framing.
126 BadFormat = 8,
127 /// Quorum not achieved.
128 DynomiteNoQuorumAchieved = 9,
129 /// Lua script keys span multiple nodes.
130 DynomiteScriptSpansNodes = 10,
131 /// Payload exceeds the configured limit.
132 DynomitePayloadTooLarge = 11,
133}
134
135impl DynErrorCode {
136 /// Human-readable label used in error responses, mirroring
137 /// `dn_strerror` in the C reference.
138 ///
139 /// # Examples
140 ///
141 /// ```
142 /// use dynomite::msg::DynErrorCode;
143 /// assert_eq!(DynErrorCode::PeerHostDown.message(), "Peer Node is down");
144 /// ```
145 #[must_use]
146 pub fn message(self) -> &'static str {
147 match self {
148 DynErrorCode::Ok => "Success",
149 DynErrorCode::DynomiteUnknownError => "Unknown Error",
150 DynErrorCode::DynomiteInvalidState => {
151 "Dynomite's current state does not allow this request"
152 }
153 DynErrorCode::DynomiteInvalidAdminReq => "Invalid request in Dynomite's admin mode",
154 DynErrorCode::PeerConnectionRefuse => "Peer Node refused connection",
155 DynErrorCode::PeerHostDown => "Peer Node is down",
156 DynErrorCode::PeerHostNotConnected => "Peer Node is not connected",
157 DynErrorCode::StorageConnectionRefuse => "Datastore refused connection",
158 DynErrorCode::BadFormat => "Bad message format",
159 DynErrorCode::DynomiteNoQuorumAchieved => "Failed to achieve Quorum",
160 DynErrorCode::DynomiteScriptSpansNodes => {
161 "Keys in the script cannot span multiple nodes"
162 }
163 DynErrorCode::DynomitePayloadTooLarge => "MSET/MGET/SCAN payload too large",
164 }
165 }
166
167 /// Origin label (`Dynomite:`, `Peer:`, `Storage:`, `unknown:`)
168 /// used in error response prefixes, mirroring
169 /// `dyn_error_source`.
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// use dynomite::msg::DynErrorCode;
175 /// assert_eq!(DynErrorCode::PeerHostDown.source(), "Peer:");
176 /// ```
177 #[must_use]
178 pub fn source(self) -> &'static str {
179 match self {
180 DynErrorCode::DynomiteInvalidAdminReq
181 | DynErrorCode::DynomiteInvalidState
182 | DynErrorCode::DynomiteNoQuorumAchieved
183 | DynErrorCode::DynomiteScriptSpansNodes
184 | DynErrorCode::DynomitePayloadTooLarge => "Dynomite:",
185 DynErrorCode::PeerConnectionRefuse
186 | DynErrorCode::PeerHostDown
187 | DynErrorCode::PeerHostNotConnected => "Peer:",
188 DynErrorCode::StorageConnectionRefuse => "Storage:",
189 _ => "unknown:",
190 }
191 }
192}
193
194static READ_REPAIRS_ENABLED: OnceLock<bool> = OnceLock::new();
195
196/// Configure whether read repairs are globally enabled.
197///
198/// Called once during configuration validation; subsequent calls are
199/// silently ignored to mirror the reference engine's read-only
200/// `g_read_repairs_enabled` global. Returns `true` when the value was
201/// installed and `false` when an earlier call already pinned it.
202///
203/// # Examples
204///
205/// ```
206/// use dynomite::msg::set_read_repairs_enabled;
207/// // Calling twice from a single test wins or loses depending on
208/// // whether anyone else got there first; the API is idempotent.
209/// let _ = set_read_repairs_enabled(true);
210/// ```
211pub fn set_read_repairs_enabled(enabled: bool) -> bool {
212 READ_REPAIRS_ENABLED.set(enabled).is_ok()
213}
214
215/// True when read repairs are enabled cluster-wide.
216///
217/// Defaults to `false`, matching the reference engine's initial
218/// value of `g_read_repairs_enabled`.
219///
220/// # Examples
221///
222/// ```
223/// use dynomite::msg::is_read_repairs_enabled;
224/// // Default state is "disabled".
225/// let _ = is_read_repairs_enabled();
226/// ```
227#[must_use]
228pub fn is_read_repairs_enabled() -> bool {
229 *READ_REPAIRS_ENABLED.get().unwrap_or(&false)
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn consistency_round_trip() {
238 for level in [
239 ConsistencyLevel::DcOne,
240 ConsistencyLevel::DcQuorum,
241 ConsistencyLevel::DcSafeQuorum,
242 ConsistencyLevel::DcEachSafeQuorum,
243 ] {
244 assert_eq!(ConsistencyLevel::from_name(level.name()), Some(level));
245 }
246 assert!(ConsistencyLevel::from_name("DC_BOGUS").is_none());
247 }
248
249 #[test]
250 fn dyn_error_code_strings_match_c() {
251 assert_eq!(DynErrorCode::Ok.message(), "Success");
252 assert_eq!(DynErrorCode::PeerHostDown.source(), "Peer:");
253 assert_eq!(DynErrorCode::DynomiteUnknownError.source(), "unknown:");
254 assert_eq!(DynErrorCode::StorageConnectionRefuse.source(), "Storage:");
255 }
256}