Skip to main content

dynomite/msg/
message.rs

1//! In-memory representation of a single Dynomite message.
2//!
3//! [`Msg`] is the unit that flows through the engine: a request from
4//! a client connection, a response from the upstream datastore, or
5//! an internal control packet. It carries the parsed metadata, the
6//! mbuf chain holding the on-the-wire bytes, the parser state used
7//! by the protocol decoders, and the bookkeeping flags every layer
8//! sets and reads.
9//!
10//! This stage builds the data shape and exposes the field-level
11//! accessors. The connection-coupled lifecycle paths (timeout
12//! tracking, queue threading, parser dispatch) land in Stage 9 once
13//! the connection state machine exists.
14
15use crate::core::types::MsgId;
16
17use super::keypos::{ArgPos, KeyPos};
18use super::msg_type::MsgType;
19use super::response_mgr::ResponseMgr;
20use crate::io::mbuf::MbufQueue;
21use crate::proto::dnode::{Dmsg, DynParseState};
22
23/// Stable connection identifier carried by [`Msg::owner`].
24///
25/// Stage 9 replaces this alias with a typed `Conn` reference. Until
26/// then, the value is just a unique 64-bit tag the connection layer
27/// stamps on every message it produces.
28pub type ConnId = u64;
29
30/// Parser outcome reported by datastore protocol decoders.
31///
32/// The variants mirror the reference engine's `msg_parse_result_t`
33/// so downstream callers can dispatch on the same semantics.
34#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
35pub enum MsgParseResult {
36    /// Parsing completed for this message.
37    #[default]
38    Ok,
39    /// Parser detected unrecoverable framing error.
40    Error,
41    /// Parser consumed valid bytes but expects to be re-driven on
42    /// the trailing bytes after a buffer split.
43    Repair,
44    /// Multi-key request needs to be fragmented before forwarding.
45    Fragment,
46    /// Need more bytes; caller must read more before retrying.
47    Again,
48    /// Parsing succeeded; downstream layer should take no action.
49    Noop,
50    /// Message was a Dynomite configuration directive.
51    DynoConfig,
52    /// Out-of-memory while parsing.
53    OomError,
54}
55
56/// Routing override applied to a request.
57///
58/// Mirrors the reference engine's `msg_routing_t`. The default
59/// (`Normal`) honors the configured key-hash routing; the other
60/// variants short-circuit it for diagnostic and special-purpose
61/// paths.
62#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
63#[repr(u8)]
64pub enum MsgRouting {
65    /// Apply the standard key-hash routing.
66    #[default]
67    Normal = 0,
68    /// Send to the local node only, ignoring the key hash.
69    LocalNodeOnly = 1,
70    /// Apply the key hash but stay within the local rack.
71    TokenOwnerLocalRackOnly = 2,
72    /// Send to every node in the local rack, ignoring the key hash.
73    AllNodesLocalRackOnly = 3,
74    /// Send to every node in every rack of every datacenter.
75    AllNodesAllRacksAllDcs = 4,
76}
77
78impl MsgRouting {
79    /// Stable string name for diagnostic output.
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use dynomite::msg::MsgRouting;
85    /// assert_eq!(MsgRouting::Normal.name(), "ROUTING_NORMAL");
86    /// ```
87    #[must_use]
88    pub fn name(self) -> &'static str {
89        match self {
90            MsgRouting::Normal => "ROUTING_NORMAL",
91            MsgRouting::LocalNodeOnly => "ROUTING_LOCAL_NODE_ONLY",
92            MsgRouting::TokenOwnerLocalRackOnly => "ROUTING_TOKEN_OWNER_LOCAL_RACK_ONLY",
93            MsgRouting::AllNodesLocalRackOnly => "ROUTING_ALL_NODES_LOCAL_RACK_ONLY",
94            MsgRouting::AllNodesAllRacksAllDcs => "ROUTING_ALL_NODES_ALL_RACKS_ALL_DCS",
95        }
96    }
97}
98
99/// Bag of boolean lifecycle flags used by the request and response
100/// pipelines.
101#[allow(clippy::struct_excessive_bools)]
102#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
103pub struct MsgFlags {
104    /// Message is currently in the error state.
105    pub is_error: bool,
106    /// At least one fragment of this multi-message vector errored.
107    pub is_ferror: bool,
108    /// Caller issued a `quit` request.
109    pub quit: bool,
110    /// Caller expects the datastore to produce a reply.
111    pub expect_datastore_reply: bool,
112    /// Reply has been routed to the caller.
113    pub done: bool,
114    /// Every fragment of this vector finished.
115    pub fdone: bool,
116    /// Discard the corresponding response on arrival.
117    pub swallow: bool,
118    /// A DNODE header has already been prepended to the mbuf chain.
119    pub dnode_header_prepended: bool,
120    /// Response for this request has been written to the wire.
121    pub rsp_sent: bool,
122    /// Read request (vs write).
123    pub is_read: bool,
124    /// Marked by the dispatcher that a repair must be issued.
125    pub needs_repair: bool,
126    /// Request body can be safely rewritten with a timestamp.
127    pub rewrite_with_ts_possible: bool,
128    /// Response managers have been initialised.
129    pub rspmgrs_inited: bool,
130}
131
132impl MsgFlags {
133    /// Construct flags with the same default state the reference
134    /// engine sets in `_msg_get`.
135    #[must_use]
136    fn default_for_msg() -> Self {
137        Self {
138            expect_datastore_reply: true,
139            is_read: true,
140            rewrite_with_ts_possible: true,
141            ..Self::default()
142        }
143    }
144}
145
146/// One Dynomite message: the in-memory representation of a request
147/// or a response on its way through the engine.
148#[derive(Debug)]
149pub struct Msg {
150    id: MsgId,
151    parent_id: MsgId,
152    ty: MsgType,
153    orig_type: MsgType,
154    is_request: bool,
155    mbufs: MbufQueue,
156    mlen: u32,
157    parse_result: MsgParseResult,
158    dyn_parse_state: DynParseState,
159    dmsg: Option<Dmsg>,
160    routing: MsgRouting,
161    consistency: super::ConsistencyLevel,
162    timestamp_us: u64,
163    error_code: i32,
164    dyn_error_code: super::DynErrorCode,
165    awaiting_rsps: u32,
166    fragment_ids: Vec<MsgId>,
167    response_ids: Vec<MsgId>,
168    selected_rsp: Option<MsgId>,
169    owner: Option<ConnId>,
170    flags: MsgFlags,
171    rspmgr: Option<ResponseMgr>,
172    additional_rspmgrs: Vec<ResponseMgr>,
173    parser_state: u32,
174    parser_pos: usize,
175    parser_token: Option<usize>,
176    rlen: u32,
177    rntokens: u32,
178    ntokens: u32,
179    nkeys: u32,
180    vlen: u32,
181    integer: i64,
182    keys: Vec<KeyPos>,
183    args: Vec<ArgPos>,
184    end_marker: Option<usize>,
185    ntoken_start: Option<usize>,
186    ntoken_end: Option<usize>,
187    frag_id: u64,
188}
189
190impl Msg {
191    /// Construct a new message with `id`, type tag `ty`, and the
192    /// request/response orientation `is_request`. The mbuf chain
193    /// starts empty and the parser is reset to `DynParseState::Start`.
194    ///
195    /// # Examples
196    ///
197    /// ```
198    /// use dynomite::msg::{Msg, MsgType};
199    /// let m = Msg::new(1, MsgType::ReqRedisGet, true);
200    /// assert_eq!(m.id(), 1);
201    /// assert!(m.is_request());
202    /// ```
203    #[must_use]
204    pub fn new(id: MsgId, ty: MsgType, is_request: bool) -> Self {
205        Self {
206            id,
207            parent_id: 0,
208            ty,
209            orig_type: MsgType::Unknown,
210            is_request,
211            mbufs: MbufQueue::new(),
212            mlen: 0,
213            parse_result: MsgParseResult::default(),
214            dyn_parse_state: DynParseState::Start,
215            dmsg: None,
216            routing: MsgRouting::Normal,
217            consistency: super::ConsistencyLevel::DcOne,
218            timestamp_us: 0,
219            error_code: 0,
220            dyn_error_code: super::DynErrorCode::Ok,
221            awaiting_rsps: 0,
222            fragment_ids: Vec::new(),
223            response_ids: Vec::new(),
224            selected_rsp: None,
225            owner: None,
226            flags: MsgFlags::default_for_msg(),
227            rspmgr: None,
228            additional_rspmgrs: Vec::new(),
229            parser_state: 0,
230            parser_pos: 0,
231            parser_token: None,
232            rlen: 0,
233            rntokens: 0,
234            ntokens: 0,
235            nkeys: 0,
236            vlen: 0,
237            integer: 0,
238            keys: Vec::new(),
239            args: Vec::new(),
240            end_marker: None,
241            ntoken_start: None,
242            ntoken_end: None,
243            frag_id: 0,
244        }
245    }
246
247    /// Borrow the parsed key list. Populated by the protocol parsers.
248    ///
249    /// # Examples
250    /// ```
251    /// use dynomite::msg::{Msg, MsgType};
252    /// assert!(Msg::new(1, MsgType::Unknown, true).keys().is_empty());
253    /// ```
254    #[must_use]
255    pub fn keys(&self) -> &[KeyPos] {
256        &self.keys
257    }
258
259    /// Mutably borrow the parsed key list.
260    pub fn keys_mut(&mut self) -> &mut Vec<KeyPos> {
261        &mut self.keys
262    }
263
264    /// Append a parsed key. Used by the protocol parsers.
265    pub fn push_key(&mut self, k: KeyPos) {
266        self.keys.push(k);
267    }
268
269    /// Borrow the parsed argument list.
270    #[must_use]
271    pub fn args(&self) -> &[ArgPos] {
272        &self.args
273    }
274
275    /// Mutably borrow the parsed argument list.
276    pub fn args_mut(&mut self) -> &mut Vec<ArgPos> {
277        &mut self.args
278    }
279
280    /// Append a parsed argument.
281    pub fn push_arg(&mut self, a: ArgPos) {
282        self.args.push(a);
283    }
284
285    /// Protocol-specific parser state index. Each parser defines
286    /// its own state alphabet keyed on this `u32`.
287    #[must_use]
288    pub fn parser_state(&self) -> u32 {
289        self.parser_state
290    }
291
292    /// Set the protocol parser state index.
293    pub fn set_parser_state(&mut self, s: u32) {
294        self.parser_state = s;
295    }
296
297    /// Cursor offset into the input buffer where the next byte
298    /// should be read.
299    #[must_use]
300    pub fn parser_pos(&self) -> usize {
301        self.parser_pos
302    }
303
304    /// Set the parser cursor offset.
305    pub fn set_parser_pos(&mut self, p: usize) {
306        self.parser_pos = p;
307    }
308
309    /// Optional token marker offset.
310    #[must_use]
311    pub fn parser_token(&self) -> Option<usize> {
312        self.parser_token
313    }
314
315    /// Set the optional token marker offset.
316    pub fn set_parser_token(&mut self, t: Option<usize>) {
317        self.parser_token = t;
318    }
319
320    /// Remaining length of the bulk argument the parser is currently
321    /// consuming.
322    #[must_use]
323    pub fn rlen(&self) -> u32 {
324        self.rlen
325    }
326
327    /// Set the bulk-argument remaining length.
328    pub fn set_rlen(&mut self, n: u32) {
329        self.rlen = n;
330    }
331
332    /// Remaining unprocessed token count for the current parse.
333    #[must_use]
334    pub fn rntokens(&self) -> u32 {
335        self.rntokens
336    }
337
338    /// Set the remaining-token counter.
339    pub fn set_rntokens(&mut self, n: u32) {
340        self.rntokens = n;
341    }
342
343    /// Total parsed token count for the current message.
344    #[must_use]
345    pub fn ntokens(&self) -> u32 {
346        self.ntokens
347    }
348
349    /// Set the total parsed token count.
350    pub fn set_ntokens(&mut self, n: u32) {
351        self.ntokens = n;
352    }
353
354    /// Number of keys the script (EVAL/EVALSHA) declared.
355    #[must_use]
356    pub fn nkeys(&self) -> u32 {
357        self.nkeys
358    }
359
360    /// Set the script-key count.
361    pub fn set_nkeys(&mut self, n: u32) {
362        self.nkeys = n;
363    }
364
365    /// Storage-command value length.
366    #[must_use]
367    pub fn vlen(&self) -> u32 {
368        self.vlen
369    }
370
371    /// Set the storage-command value length.
372    pub fn set_vlen(&mut self, n: u32) {
373        self.vlen = n;
374    }
375
376    /// Integer value carried by the response (`:n\r\n`).
377    #[must_use]
378    pub fn integer(&self) -> i64 {
379        self.integer
380    }
381
382    /// Set the integer response value.
383    pub fn set_integer(&mut self, v: i64) {
384        self.integer = v;
385    }
386
387    /// Offset of the multi-bulk `END` marker in the response, if any.
388    #[must_use]
389    pub fn end_marker(&self) -> Option<usize> {
390        self.end_marker
391    }
392
393    /// Set the response `END` marker offset.
394    pub fn set_end_marker(&mut self, m: Option<usize>) {
395        self.end_marker = m;
396    }
397
398    /// Start offset of the multi-bulk argument count token.
399    #[must_use]
400    pub fn ntoken_start(&self) -> Option<usize> {
401        self.ntoken_start
402    }
403
404    /// End offset (exclusive) of the multi-bulk argument count token.
405    #[must_use]
406    pub fn ntoken_end(&self) -> Option<usize> {
407        self.ntoken_end
408    }
409
410    /// Set the multi-bulk argument count span.
411    pub fn set_ntoken_span(&mut self, start: Option<usize>, end: Option<usize>) {
412        self.ntoken_start = start;
413        self.ntoken_end = end;
414    }
415
416    /// Fragment id grouping all sub-messages produced from a
417    /// multi-key request.
418    #[must_use]
419    pub fn frag_id(&self) -> u64 {
420        self.frag_id
421    }
422
423    /// Set the fragment id.
424    pub fn set_frag_id(&mut self, id: u64) {
425        self.frag_id = id;
426    }
427
428    /// Message id.
429    ///
430    /// # Examples
431    /// ```
432    /// use dynomite::msg::{Msg, MsgType};
433    /// assert_eq!(Msg::new(99, MsgType::Unknown, true).id(), 99);
434    /// ```
435    #[must_use]
436    pub fn id(&self) -> MsgId {
437        self.id
438    }
439
440    /// Parent id (zero when not a fragment).
441    ///
442    /// # Examples
443    /// ```
444    /// use dynomite::msg::{Msg, MsgType};
445    /// assert_eq!(Msg::new(1, MsgType::Unknown, true).parent_id(), 0);
446    /// ```
447    #[must_use]
448    pub fn parent_id(&self) -> MsgId {
449        self.parent_id
450    }
451
452    /// Set the parent id.
453    ///
454    /// # Examples
455    /// ```
456    /// use dynomite::msg::{Msg, MsgType};
457    /// let mut m = Msg::new(2, MsgType::Unknown, true);
458    /// m.set_parent_id(1);
459    /// assert_eq!(m.parent_id(), 1);
460    /// ```
461    pub fn set_parent_id(&mut self, parent: MsgId) {
462        self.parent_id = parent;
463    }
464
465    /// Message type tag.
466    ///
467    /// # Examples
468    /// ```
469    /// use dynomite::msg::{Msg, MsgType};
470    /// assert_eq!(Msg::new(1, MsgType::ReqMcGet, true).ty(), MsgType::ReqMcGet);
471    /// ```
472    #[must_use]
473    pub fn ty(&self) -> MsgType {
474        self.ty
475    }
476
477    /// Override the message type. The previous value is preserved as
478    /// the original type so query rewriters can recover it.
479    ///
480    /// # Examples
481    /// ```
482    /// use dynomite::msg::{Msg, MsgType};
483    /// let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
484    /// m.set_type(MsgType::ReqRedisSet);
485    /// assert_eq!(m.ty(), MsgType::ReqRedisSet);
486    /// assert_eq!(m.orig_type(), MsgType::ReqRedisGet);
487    /// ```
488    pub fn set_type(&mut self, ty: MsgType) {
489        self.orig_type = self.ty;
490        self.ty = ty;
491    }
492
493    /// Original message type before any rewrite.
494    ///
495    /// # Examples
496    /// ```
497    /// use dynomite::msg::{Msg, MsgType};
498    /// assert_eq!(
499    ///     Msg::new(1, MsgType::ReqRedisGet, true).orig_type(),
500    ///     MsgType::Unknown,
501    /// );
502    /// ```
503    #[must_use]
504    pub fn orig_type(&self) -> MsgType {
505        self.orig_type
506    }
507
508    /// True for requests.
509    ///
510    /// # Examples
511    /// ```
512    /// use dynomite::msg::{Msg, MsgType};
513    /// assert!(Msg::new(1, MsgType::ReqMcGet, true).is_request());
514    /// assert!(!Msg::new(1, MsgType::RspMcStored, false).is_request());
515    /// ```
516    #[must_use]
517    pub fn is_request(&self) -> bool {
518        self.is_request
519    }
520
521    /// Borrow the underlying mbuf chain.
522    ///
523    /// # Examples
524    /// ```
525    /// use dynomite::msg::{Msg, MsgType};
526    /// let m = Msg::new(1, MsgType::Unknown, true);
527    /// assert!(m.mbufs().is_empty());
528    /// ```
529    #[must_use]
530    pub fn mbufs(&self) -> &MbufQueue {
531        &self.mbufs
532    }
533
534    /// Mutably borrow the mbuf chain.
535    ///
536    /// # Examples
537    /// ```
538    /// use dynomite::io::mbuf::MbufPool;
539    /// use dynomite::msg::{Msg, MsgType};
540    ///
541    /// let pool = MbufPool::default();
542    /// let mut m = Msg::new(1, MsgType::Unknown, true);
543    /// m.mbufs_mut().push_back(pool.get());
544    /// assert_eq!(m.mbufs().len(), 1);
545    /// ```
546    pub fn mbufs_mut(&mut self) -> &mut MbufQueue {
547        &mut self.mbufs
548    }
549
550    /// Cumulative readable byte count of the chain (`mlen`).
551    ///
552    /// # Examples
553    /// ```
554    /// use dynomite::msg::{Msg, MsgType};
555    /// assert_eq!(Msg::new(1, MsgType::Unknown, true).mlen(), 0);
556    /// ```
557    #[must_use]
558    pub fn mlen(&self) -> u32 {
559        self.mlen
560    }
561
562    /// Refresh `mlen` from the current chain.
563    ///
564    /// The parser updates the length as it consumes bytes; callers
565    /// that mutate the chain directly call this to keep the cached
566    /// length consistent with the actual chain content.
567    ///
568    /// # Examples
569    /// ```
570    /// use dynomite::io::mbuf::MbufPool;
571    /// use dynomite::msg::{Msg, MsgType};
572    ///
573    /// let pool = MbufPool::default();
574    /// let mut m = Msg::new(1, MsgType::Unknown, true);
575    /// let mut buf = pool.get();
576    /// buf.recv(b"hi");
577    /// m.mbufs_mut().push_back(buf);
578    /// m.recompute_mlen();
579    /// assert_eq!(m.mlen(), 2);
580    /// ```
581    pub fn recompute_mlen(&mut self) {
582        let total: usize = self.mbufs.iter().map(crate::io::mbuf::Mbuf::len).sum();
583        self.mlen = u32::try_from(total).unwrap_or(u32::MAX);
584    }
585
586    /// Direct setter for `mlen`. Use [`Msg::recompute_mlen`] when the
587    /// chain has been mutated; this entry point exists for parsers
588    /// that adjust the value as they consume bytes.
589    ///
590    /// # Examples
591    /// ```
592    /// use dynomite::msg::{Msg, MsgType};
593    /// let mut m = Msg::new(1, MsgType::Unknown, true);
594    /// m.set_mlen(123);
595    /// assert_eq!(m.mlen(), 123);
596    /// ```
597    pub fn set_mlen(&mut self, mlen: u32) {
598        self.mlen = mlen;
599    }
600
601    /// Last parse outcome.
602    ///
603    /// # Examples
604    /// ```
605    /// use dynomite::msg::{Msg, MsgParseResult, MsgType};
606    /// assert_eq!(
607    ///     Msg::new(1, MsgType::Unknown, true).parse_result(),
608    ///     MsgParseResult::Ok,
609    /// );
610    /// ```
611    #[must_use]
612    pub fn parse_result(&self) -> MsgParseResult {
613        self.parse_result
614    }
615
616    /// Set the parse outcome.
617    ///
618    /// # Examples
619    /// ```
620    /// use dynomite::msg::{Msg, MsgParseResult, MsgType};
621    /// let mut m = Msg::new(1, MsgType::Unknown, true);
622    /// m.set_parse_result(MsgParseResult::Again);
623    /// assert_eq!(m.parse_result(), MsgParseResult::Again);
624    /// ```
625    pub fn set_parse_result(&mut self, r: MsgParseResult) {
626        self.parse_result = r;
627    }
628
629    /// Current DNODE parser state.
630    ///
631    /// # Examples
632    /// ```
633    /// use dynomite::msg::{Msg, MsgType};
634    /// use dynomite::proto::dnode::DynParseState;
635    /// assert_eq!(
636    ///     Msg::new(1, MsgType::Unknown, true).dyn_parse_state(),
637    ///     DynParseState::Start,
638    /// );
639    /// ```
640    #[must_use]
641    pub fn dyn_parse_state(&self) -> DynParseState {
642        self.dyn_parse_state
643    }
644
645    /// Set the DNODE parser state.
646    pub fn set_dyn_parse_state(&mut self, state: DynParseState) {
647        self.dyn_parse_state = state;
648    }
649
650    /// Borrow the parsed DNODE header, if any.
651    ///
652    /// # Examples
653    /// ```
654    /// use dynomite::msg::{Msg, MsgType};
655    /// assert!(Msg::new(1, MsgType::Unknown, true).dmsg().is_none());
656    /// ```
657    #[must_use]
658    pub fn dmsg(&self) -> Option<&Dmsg> {
659        self.dmsg.as_ref()
660    }
661
662    /// Mutably borrow the parsed DNODE header.
663    pub fn dmsg_mut(&mut self) -> Option<&mut Dmsg> {
664        self.dmsg.as_mut()
665    }
666
667    /// Attach a parsed DNODE header.
668    pub fn set_dmsg(&mut self, dmsg: Dmsg) {
669        self.dmsg = Some(dmsg);
670    }
671
672    /// Routing override.
673    #[must_use]
674    pub fn routing(&self) -> MsgRouting {
675        self.routing
676    }
677
678    /// Set the routing override.
679    pub fn set_routing(&mut self, routing: MsgRouting) {
680        self.routing = routing;
681    }
682
683    /// Consistency level for this message.
684    #[must_use]
685    pub fn consistency(&self) -> super::ConsistencyLevel {
686        self.consistency
687    }
688
689    /// Set the consistency level.
690    pub fn set_consistency(&mut self, level: super::ConsistencyLevel) {
691        self.consistency = level;
692    }
693
694    /// Microsecond timestamp recorded at request creation.
695    #[must_use]
696    pub fn timestamp_us(&self) -> u64 {
697        self.timestamp_us
698    }
699
700    /// Update the request timestamp.
701    pub fn set_timestamp_us(&mut self, ts: u64) {
702        self.timestamp_us = ts;
703    }
704
705    /// Datastore-level error code (`errno`-shaped).
706    #[must_use]
707    pub fn error_code(&self) -> i32 {
708        self.error_code
709    }
710
711    /// Set the datastore error code.
712    pub fn set_error_code(&mut self, e: i32) {
713        self.error_code = e;
714    }
715
716    /// Dynomite-level error code.
717    #[must_use]
718    pub fn dyn_error_code(&self) -> super::DynErrorCode {
719        self.dyn_error_code
720    }
721
722    /// Set the Dynomite error code.
723    pub fn set_dyn_error_code(&mut self, e: super::DynErrorCode) {
724        self.dyn_error_code = e;
725    }
726
727    /// Number of replies the request is still waiting on.
728    #[must_use]
729    pub fn awaiting_rsps(&self) -> u32 {
730        self.awaiting_rsps
731    }
732
733    /// Increment `awaiting_rsps`.
734    pub fn incr_awaiting_rsps(&mut self) {
735        self.awaiting_rsps = self.awaiting_rsps.saturating_add(1);
736    }
737
738    /// Decrement `awaiting_rsps`.
739    pub fn decr_awaiting_rsps(&mut self) {
740        self.awaiting_rsps = self.awaiting_rsps.saturating_sub(1);
741    }
742
743    /// Set `awaiting_rsps` directly. Used by the response manager
744    /// initialiser to seed the per-DC count.
745    pub fn set_awaiting_rsps(&mut self, n: u32) {
746        self.awaiting_rsps = n;
747    }
748
749    /// Borrow the fragment-id list.
750    #[must_use]
751    pub fn fragment_ids(&self) -> &[MsgId] {
752        &self.fragment_ids
753    }
754
755    /// Append `id` to the fragment-id list.
756    pub fn push_fragment_id(&mut self, id: MsgId) {
757        self.fragment_ids.push(id);
758    }
759
760    /// Borrow the response-id list.
761    #[must_use]
762    pub fn response_ids(&self) -> &[MsgId] {
763        &self.response_ids
764    }
765
766    /// Append `id` to the response-id list.
767    pub fn push_response_id(&mut self, id: MsgId) {
768        self.response_ids.push(id);
769    }
770
771    /// Currently-selected response id.
772    #[must_use]
773    pub fn selected_rsp(&self) -> Option<MsgId> {
774        self.selected_rsp
775    }
776
777    /// Set the currently-selected response id.
778    pub fn set_selected_rsp(&mut self, id: Option<MsgId>) {
779        self.selected_rsp = id;
780    }
781
782    /// Owner connection id (placeholder until Stage 9).
783    #[must_use]
784    pub fn owner(&self) -> Option<ConnId> {
785        self.owner
786    }
787
788    /// Set the owner connection id.
789    pub fn set_owner(&mut self, owner: Option<ConnId>) {
790        self.owner = owner;
791    }
792
793    /// Borrow the lifecycle flags.
794    #[must_use]
795    pub fn flags(&self) -> &MsgFlags {
796        &self.flags
797    }
798
799    /// Mutably borrow the lifecycle flags.
800    pub fn flags_mut(&mut self) -> &mut MsgFlags {
801        &mut self.flags
802    }
803
804    /// Set the `swallow` flag.
805    pub fn set_swallow(&mut self, on: bool) {
806        self.flags.swallow = on;
807    }
808
809    /// Set the `done` flag.
810    pub fn set_done(&mut self, on: bool) {
811        self.flags.done = on;
812    }
813
814    /// Set the `is_error` flag.
815    pub fn set_is_error(&mut self, on: bool) {
816        self.flags.is_error = on;
817    }
818
819    /// Borrow the local-DC response manager.
820    #[must_use]
821    pub fn rspmgr(&self) -> Option<&ResponseMgr> {
822        self.rspmgr.as_ref()
823    }
824
825    /// Mutably borrow the local-DC response manager.
826    pub fn rspmgr_mut(&mut self) -> Option<&mut ResponseMgr> {
827        self.rspmgr.as_mut()
828    }
829
830    /// Install a fresh response manager for the local DC.
831    pub fn set_rspmgr(&mut self, mgr: ResponseMgr) {
832        self.flags.rspmgrs_inited = true;
833        self.set_awaiting_rsps(u32::from(mgr.max_responses()));
834        self.rspmgr = Some(mgr);
835    }
836
837    /// Borrow the per-remote-DC response managers.
838    #[must_use]
839    pub fn additional_rspmgrs(&self) -> &[ResponseMgr] {
840        &self.additional_rspmgrs
841    }
842
843    /// Mutably borrow the per-remote-DC response managers.
844    pub fn additional_rspmgrs_mut(&mut self) -> &mut Vec<ResponseMgr> {
845        &mut self.additional_rspmgrs
846    }
847}
848
849#[cfg(test)]
850mod tests {
851    use super::*;
852
853    #[test]
854    fn defaults_match_reference() {
855        let m = Msg::new(1, MsgType::ReqRedisGet, true);
856        assert!(m.flags().expect_datastore_reply);
857        assert!(m.flags().is_read);
858        assert!(m.flags().rewrite_with_ts_possible);
859        assert!(!m.flags().is_error);
860        assert!(!m.flags().rspmgrs_inited);
861        assert_eq!(m.consistency(), super::super::ConsistencyLevel::DcOne);
862        assert_eq!(m.routing(), MsgRouting::Normal);
863        assert_eq!(m.dyn_parse_state(), DynParseState::Start);
864    }
865
866    #[test]
867    fn set_type_preserves_original() {
868        let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
869        assert_eq!(m.orig_type(), MsgType::Unknown);
870        m.set_type(MsgType::ReqRedisSet);
871        assert_eq!(m.ty(), MsgType::ReqRedisSet);
872        assert_eq!(m.orig_type(), MsgType::ReqRedisGet);
873    }
874
875    #[test]
876    fn awaiting_saturates() {
877        let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
878        m.decr_awaiting_rsps();
879        assert_eq!(m.awaiting_rsps(), 0);
880        m.incr_awaiting_rsps();
881        m.incr_awaiting_rsps();
882        assert_eq!(m.awaiting_rsps(), 2);
883        m.decr_awaiting_rsps();
884        assert_eq!(m.awaiting_rsps(), 1);
885    }
886}