Skip to main content

dynomite/msg/
mod.rs

1//! Message model: the in-memory shape every request and response
2//! takes inside the engine, plus the queues, indices, and per-DC
3//! response managers that thread them through the dispatcher.
4//!
5//! This module exposes the data layer; the connection-coupled
6//! lifecycle (recv/send, timeout queues, peer forwarding) ships in
7//! Stage 9 once the connection state machine is in place. Helpers
8//! that already have a clean data-only definition (error response
9//! construction, fragment bookkeeping, quorum decisions) live here.
10//!
11//! # Examples
12//!
13//! ```
14//! use dynomite::msg::{Msg, MsgQueue, MsgType, ResponseMgr};
15//!
16//! let mut q = MsgQueue::new();
17//! q.push_back(Msg::new(1, MsgType::ReqRedisGet, true));
18//!
19//! let req = q.front().unwrap();
20//! let mgr = ResponseMgr::new(req, 1, None);
21//! assert_eq!(mgr.quorum_responses(), 1);
22//! ```
23
24use std::sync::OnceLock;
25
26pub mod index;
27pub mod keypos;
28pub mod message;
29pub mod msg_type;
30pub mod queue;
31pub mod request;
32pub mod response;
33pub mod response_mgr;
34
35pub use self::index::MsgIndex;
36pub use self::keypos::{ArgPos, KeyPos};
37pub use self::message::{ConnId, Msg, MsgFlags, MsgParseResult, MsgRouting};
38pub use self::msg_type::MsgType;
39pub use self::queue::MsgQueue;
40pub use self::response_mgr::{QuorumOutcome, ResponseMgr, MAX_REPLICAS_PER_DC};
41
42/// Cluster consistency level applied to a single message.
43///
44/// The numeric values match `consistency_t` in the C reference.
45#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
46#[repr(u8)]
47pub enum ConsistencyLevel {
48    /// Wait for one replica to ack.
49    #[default]
50    DcOne = 0,
51    /// Wait for a per-DC majority.
52    DcQuorum = 1,
53    /// `DcQuorum` with body checksum agreement; mismatches trigger
54    /// read repair.
55    DcSafeQuorum = 2,
56    /// `DcSafeQuorum` evaluated independently per datacenter.
57    DcEachSafeQuorum = 3,
58}
59
60impl ConsistencyLevel {
61    /// Stable string name.
62    ///
63    /// # Examples
64    ///
65    /// ```
66    /// use dynomite::msg::ConsistencyLevel;
67    /// assert_eq!(ConsistencyLevel::DcQuorum.name(), "DC_QUORUM");
68    /// ```
69    #[must_use]
70    pub fn name(self) -> &'static str {
71        match self {
72            ConsistencyLevel::DcOne => "DC_ONE",
73            ConsistencyLevel::DcQuorum => "DC_QUORUM",
74            ConsistencyLevel::DcSafeQuorum => "DC_SAFE_QUORUM",
75            ConsistencyLevel::DcEachSafeQuorum => "DC_EACH_SAFE_QUORUM",
76        }
77    }
78
79    /// Recover the level from its uppercase name.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use dynomite::msg::ConsistencyLevel;
85    /// assert_eq!(
86    ///     ConsistencyLevel::from_name("DC_SAFE_QUORUM"),
87    ///     Some(ConsistencyLevel::DcSafeQuorum),
88    /// );
89    /// ```
90    #[must_use]
91    pub fn from_name(name: &str) -> Option<Self> {
92        match name {
93            "DC_ONE" => Some(Self::DcOne),
94            "DC_QUORUM" => Some(Self::DcQuorum),
95            "DC_SAFE_QUORUM" => Some(Self::DcSafeQuorum),
96            "DC_EACH_SAFE_QUORUM" => Some(Self::DcEachSafeQuorum),
97            _ => None,
98        }
99    }
100}
101
102/// Dynomite-side error code carried in a message envelope.
103///
104/// Matches `dyn_error_t` from the C reference verbatim.
105#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
106#[repr(u8)]
107pub enum DynErrorCode {
108    /// No error.
109    #[default]
110    Ok = 0,
111    /// Unspecified error.
112    DynomiteUnknownError = 1,
113    /// Engine state forbids the request.
114    DynomiteInvalidState = 2,
115    /// Admin-only command issued in non-admin mode.
116    DynomiteInvalidAdminReq = 3,
117    /// Peer refused the connection.
118    PeerConnectionRefuse = 4,
119    /// Peer reachable but reported down.
120    PeerHostDown = 5,
121    /// Peer not yet connected.
122    PeerHostNotConnected = 6,
123    /// Datastore refused the connection.
124    StorageConnectionRefuse = 7,
125    /// Bad message framing.
126    BadFormat = 8,
127    /// Quorum not achieved.
128    DynomiteNoQuorumAchieved = 9,
129    /// Lua script keys span multiple nodes.
130    DynomiteScriptSpansNodes = 10,
131    /// Payload exceeds the configured limit.
132    DynomitePayloadTooLarge = 11,
133}
134
135impl DynErrorCode {
136    /// Human-readable label used in error responses, mirroring
137    /// `dn_strerror` in the C reference.
138    ///
139    /// # Examples
140    ///
141    /// ```
142    /// use dynomite::msg::DynErrorCode;
143    /// assert_eq!(DynErrorCode::PeerHostDown.message(), "Peer Node is down");
144    /// ```
145    #[must_use]
146    pub fn message(self) -> &'static str {
147        match self {
148            DynErrorCode::Ok => "Success",
149            DynErrorCode::DynomiteUnknownError => "Unknown Error",
150            DynErrorCode::DynomiteInvalidState => {
151                "Dynomite's current state does not allow this request"
152            }
153            DynErrorCode::DynomiteInvalidAdminReq => "Invalid request in Dynomite's admin mode",
154            DynErrorCode::PeerConnectionRefuse => "Peer Node refused connection",
155            DynErrorCode::PeerHostDown => "Peer Node is down",
156            DynErrorCode::PeerHostNotConnected => "Peer Node is not connected",
157            DynErrorCode::StorageConnectionRefuse => "Datastore refused connection",
158            DynErrorCode::BadFormat => "Bad message format",
159            DynErrorCode::DynomiteNoQuorumAchieved => "Failed to achieve Quorum",
160            DynErrorCode::DynomiteScriptSpansNodes => {
161                "Keys in the script cannot span multiple nodes"
162            }
163            DynErrorCode::DynomitePayloadTooLarge => "MSET/MGET/SCAN payload too large",
164        }
165    }
166
167    /// Origin label (`Dynomite:`, `Peer:`, `Storage:`, `unknown:`)
168    /// used in error response prefixes, mirroring
169    /// `dyn_error_source`.
170    ///
171    /// # Examples
172    ///
173    /// ```
174    /// use dynomite::msg::DynErrorCode;
175    /// assert_eq!(DynErrorCode::PeerHostDown.source(), "Peer:");
176    /// ```
177    #[must_use]
178    pub fn source(self) -> &'static str {
179        match self {
180            DynErrorCode::DynomiteInvalidAdminReq
181            | DynErrorCode::DynomiteInvalidState
182            | DynErrorCode::DynomiteNoQuorumAchieved
183            | DynErrorCode::DynomiteScriptSpansNodes
184            | DynErrorCode::DynomitePayloadTooLarge => "Dynomite:",
185            DynErrorCode::PeerConnectionRefuse
186            | DynErrorCode::PeerHostDown
187            | DynErrorCode::PeerHostNotConnected => "Peer:",
188            DynErrorCode::StorageConnectionRefuse => "Storage:",
189            _ => "unknown:",
190        }
191    }
192}
193
194static READ_REPAIRS_ENABLED: OnceLock<bool> = OnceLock::new();
195
196/// Configure whether read repairs are globally enabled.
197///
198/// Called once during configuration validation; subsequent calls are
199/// silently ignored to mirror the reference engine's read-only
200/// `g_read_repairs_enabled` global. Returns `true` when the value was
201/// installed and `false` when an earlier call already pinned it.
202///
203/// # Examples
204///
205/// ```
206/// use dynomite::msg::set_read_repairs_enabled;
207/// // Calling twice from a single test wins or loses depending on
208/// // whether anyone else got there first; the API is idempotent.
209/// let _ = set_read_repairs_enabled(true);
210/// ```
211pub fn set_read_repairs_enabled(enabled: bool) -> bool {
212    READ_REPAIRS_ENABLED.set(enabled).is_ok()
213}
214
215/// True when read repairs are enabled cluster-wide.
216///
217/// Defaults to `false`, matching the reference engine's initial
218/// value of `g_read_repairs_enabled`.
219///
220/// # Examples
221///
222/// ```
223/// use dynomite::msg::is_read_repairs_enabled;
224/// // Default state is "disabled".
225/// let _ = is_read_repairs_enabled();
226/// ```
227#[must_use]
228pub fn is_read_repairs_enabled() -> bool {
229    *READ_REPAIRS_ENABLED.get().unwrap_or(&false)
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn consistency_round_trip() {
238        for level in [
239            ConsistencyLevel::DcOne,
240            ConsistencyLevel::DcQuorum,
241            ConsistencyLevel::DcSafeQuorum,
242            ConsistencyLevel::DcEachSafeQuorum,
243        ] {
244            assert_eq!(ConsistencyLevel::from_name(level.name()), Some(level));
245        }
246        assert!(ConsistencyLevel::from_name("DC_BOGUS").is_none());
247    }
248
249    #[test]
250    fn dyn_error_code_strings_match_c() {
251        assert_eq!(DynErrorCode::Ok.message(), "Success");
252        assert_eq!(DynErrorCode::PeerHostDown.source(), "Peer:");
253        assert_eq!(DynErrorCode::DynomiteUnknownError.source(), "unknown:");
254        assert_eq!(DynErrorCode::StorageConnectionRefuse.source(), "Storage:");
255    }
256}