dynomite/msg/message.rs
1//! In-memory representation of a single Dynomite message.
2//!
3//! [`Msg`] is the unit that flows through the engine: a request from
4//! a client connection, a response from the upstream datastore, or
5//! an internal control packet. It carries the parsed metadata, the
6//! mbuf chain holding the on-the-wire bytes, the parser state used
7//! by the protocol decoders, and the bookkeeping flags every layer
8//! sets and reads.
9//!
10//! This stage builds the data shape and exposes the field-level
11//! accessors. The connection-coupled lifecycle paths (timeout
12//! tracking, queue threading, parser dispatch) land in Stage 9 once
13//! the connection state machine exists.
14
15use crate::core::types::MsgId;
16
17use super::keypos::{ArgPos, KeyPos};
18use super::msg_type::MsgType;
19use super::response_mgr::ResponseMgr;
20use crate::io::mbuf::MbufQueue;
21use crate::proto::dnode::{Dmsg, DynParseState};
22
23/// Stable connection identifier carried by [`Msg::owner`].
24///
25/// Stage 9 replaces this alias with a typed `Conn` reference. Until
26/// then, the value is just a unique 64-bit tag the connection layer
27/// stamps on every message it produces.
28pub type ConnId = u64;
29
30/// Parser outcome reported by datastore protocol decoders.
31///
32/// The variants mirror the reference engine's `msg_parse_result_t`
33/// so downstream callers can dispatch on the same semantics.
34#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
35pub enum MsgParseResult {
36 /// Parsing completed for this message.
37 #[default]
38 Ok,
39 /// Parser detected unrecoverable framing error.
40 Error,
41 /// Parser consumed valid bytes but expects to be re-driven on
42 /// the trailing bytes after a buffer split.
43 Repair,
44 /// Multi-key request needs to be fragmented before forwarding.
45 Fragment,
46 /// Need more bytes; caller must read more before retrying.
47 Again,
48 /// Parsing succeeded; downstream layer should take no action.
49 Noop,
50 /// Message was a Dynomite configuration directive.
51 DynoConfig,
52 /// Out-of-memory while parsing.
53 OomError,
54}
55
56/// Routing override applied to a request.
57///
58/// Mirrors the reference engine's `msg_routing_t`. The default
59/// (`Normal`) honors the configured key-hash routing; the other
60/// variants short-circuit it for diagnostic and special-purpose
61/// paths.
62#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
63#[repr(u8)]
64pub enum MsgRouting {
65 /// Apply the standard key-hash routing.
66 #[default]
67 Normal = 0,
68 /// Send to the local node only, ignoring the key hash.
69 LocalNodeOnly = 1,
70 /// Apply the key hash but stay within the local rack.
71 TokenOwnerLocalRackOnly = 2,
72 /// Send to every node in the local rack, ignoring the key hash.
73 AllNodesLocalRackOnly = 3,
74 /// Send to every node in every rack of every datacenter.
75 AllNodesAllRacksAllDcs = 4,
76}
77
78impl MsgRouting {
79 /// Stable string name for diagnostic output.
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use dynomite::msg::MsgRouting;
85 /// assert_eq!(MsgRouting::Normal.name(), "ROUTING_NORMAL");
86 /// ```
87 #[must_use]
88 pub fn name(self) -> &'static str {
89 match self {
90 MsgRouting::Normal => "ROUTING_NORMAL",
91 MsgRouting::LocalNodeOnly => "ROUTING_LOCAL_NODE_ONLY",
92 MsgRouting::TokenOwnerLocalRackOnly => "ROUTING_TOKEN_OWNER_LOCAL_RACK_ONLY",
93 MsgRouting::AllNodesLocalRackOnly => "ROUTING_ALL_NODES_LOCAL_RACK_ONLY",
94 MsgRouting::AllNodesAllRacksAllDcs => "ROUTING_ALL_NODES_ALL_RACKS_ALL_DCS",
95 }
96 }
97}
98
99/// Bag of boolean lifecycle flags used by the request and response
100/// pipelines.
101#[allow(clippy::struct_excessive_bools)]
102#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
103pub struct MsgFlags {
104 /// Message is currently in the error state.
105 pub is_error: bool,
106 /// At least one fragment of this multi-message vector errored.
107 pub is_ferror: bool,
108 /// Caller issued a `quit` request.
109 pub quit: bool,
110 /// Caller expects the datastore to produce a reply.
111 pub expect_datastore_reply: bool,
112 /// Reply has been routed to the caller.
113 pub done: bool,
114 /// Every fragment of this vector finished.
115 pub fdone: bool,
116 /// Discard the corresponding response on arrival.
117 pub swallow: bool,
118 /// A DNODE header has already been prepended to the mbuf chain.
119 pub dnode_header_prepended: bool,
120 /// Response for this request has been written to the wire.
121 pub rsp_sent: bool,
122 /// Read request (vs write).
123 pub is_read: bool,
124 /// Marked by the dispatcher that a repair must be issued.
125 pub needs_repair: bool,
126 /// Request body can be safely rewritten with a timestamp.
127 pub rewrite_with_ts_possible: bool,
128 /// Response managers have been initialised.
129 pub rspmgrs_inited: bool,
130}
131
132impl MsgFlags {
133 /// Construct flags with the same default state the reference
134 /// engine sets in `_msg_get`.
135 #[must_use]
136 fn default_for_msg() -> Self {
137 Self {
138 expect_datastore_reply: true,
139 is_read: true,
140 rewrite_with_ts_possible: true,
141 ..Self::default()
142 }
143 }
144}
145
146/// One Dynomite message: the in-memory representation of a request
147/// or a response on its way through the engine.
148#[derive(Debug)]
149pub struct Msg {
150 id: MsgId,
151 parent_id: MsgId,
152 ty: MsgType,
153 orig_type: MsgType,
154 is_request: bool,
155 mbufs: MbufQueue,
156 mlen: u32,
157 parse_result: MsgParseResult,
158 dyn_parse_state: DynParseState,
159 dmsg: Option<Dmsg>,
160 routing: MsgRouting,
161 consistency: super::ConsistencyLevel,
162 timestamp_us: u64,
163 error_code: i32,
164 dyn_error_code: super::DynErrorCode,
165 awaiting_rsps: u32,
166 fragment_ids: Vec<MsgId>,
167 response_ids: Vec<MsgId>,
168 selected_rsp: Option<MsgId>,
169 owner: Option<ConnId>,
170 flags: MsgFlags,
171 rspmgr: Option<ResponseMgr>,
172 additional_rspmgrs: Vec<ResponseMgr>,
173 parser_state: u32,
174 parser_pos: usize,
175 parser_token: Option<usize>,
176 rlen: u32,
177 rntokens: u32,
178 ntokens: u32,
179 nkeys: u32,
180 vlen: u32,
181 integer: i64,
182 keys: Vec<KeyPos>,
183 args: Vec<ArgPos>,
184 end_marker: Option<usize>,
185 ntoken_start: Option<usize>,
186 ntoken_end: Option<usize>,
187 frag_id: u64,
188}
189
190impl Msg {
191 /// Construct a new message with `id`, type tag `ty`, and the
192 /// request/response orientation `is_request`. The mbuf chain
193 /// starts empty and the parser is reset to `DynParseState::Start`.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use dynomite::msg::{Msg, MsgType};
199 /// let m = Msg::new(1, MsgType::ReqRedisGet, true);
200 /// assert_eq!(m.id(), 1);
201 /// assert!(m.is_request());
202 /// ```
203 #[must_use]
204 pub fn new(id: MsgId, ty: MsgType, is_request: bool) -> Self {
205 Self {
206 id,
207 parent_id: 0,
208 ty,
209 orig_type: MsgType::Unknown,
210 is_request,
211 mbufs: MbufQueue::new(),
212 mlen: 0,
213 parse_result: MsgParseResult::default(),
214 dyn_parse_state: DynParseState::Start,
215 dmsg: None,
216 routing: MsgRouting::Normal,
217 consistency: super::ConsistencyLevel::DcOne,
218 timestamp_us: 0,
219 error_code: 0,
220 dyn_error_code: super::DynErrorCode::Ok,
221 awaiting_rsps: 0,
222 fragment_ids: Vec::new(),
223 response_ids: Vec::new(),
224 selected_rsp: None,
225 owner: None,
226 flags: MsgFlags::default_for_msg(),
227 rspmgr: None,
228 additional_rspmgrs: Vec::new(),
229 parser_state: 0,
230 parser_pos: 0,
231 parser_token: None,
232 rlen: 0,
233 rntokens: 0,
234 ntokens: 0,
235 nkeys: 0,
236 vlen: 0,
237 integer: 0,
238 keys: Vec::new(),
239 args: Vec::new(),
240 end_marker: None,
241 ntoken_start: None,
242 ntoken_end: None,
243 frag_id: 0,
244 }
245 }
246
247 /// Borrow the parsed key list. Populated by the protocol parsers.
248 ///
249 /// # Examples
250 /// ```
251 /// use dynomite::msg::{Msg, MsgType};
252 /// assert!(Msg::new(1, MsgType::Unknown, true).keys().is_empty());
253 /// ```
254 #[must_use]
255 pub fn keys(&self) -> &[KeyPos] {
256 &self.keys
257 }
258
259 /// Mutably borrow the parsed key list.
260 pub fn keys_mut(&mut self) -> &mut Vec<KeyPos> {
261 &mut self.keys
262 }
263
264 /// Append a parsed key. Used by the protocol parsers.
265 pub fn push_key(&mut self, k: KeyPos) {
266 self.keys.push(k);
267 }
268
269 /// Borrow the parsed argument list.
270 #[must_use]
271 pub fn args(&self) -> &[ArgPos] {
272 &self.args
273 }
274
275 /// Mutably borrow the parsed argument list.
276 pub fn args_mut(&mut self) -> &mut Vec<ArgPos> {
277 &mut self.args
278 }
279
280 /// Append a parsed argument.
281 pub fn push_arg(&mut self, a: ArgPos) {
282 self.args.push(a);
283 }
284
285 /// Protocol-specific parser state index. Each parser defines
286 /// its own state alphabet keyed on this `u32`.
287 #[must_use]
288 pub fn parser_state(&self) -> u32 {
289 self.parser_state
290 }
291
292 /// Set the protocol parser state index.
293 pub fn set_parser_state(&mut self, s: u32) {
294 self.parser_state = s;
295 }
296
297 /// Cursor offset into the input buffer where the next byte
298 /// should be read.
299 #[must_use]
300 pub fn parser_pos(&self) -> usize {
301 self.parser_pos
302 }
303
304 /// Set the parser cursor offset.
305 pub fn set_parser_pos(&mut self, p: usize) {
306 self.parser_pos = p;
307 }
308
309 /// Optional token marker offset.
310 #[must_use]
311 pub fn parser_token(&self) -> Option<usize> {
312 self.parser_token
313 }
314
315 /// Set the optional token marker offset.
316 pub fn set_parser_token(&mut self, t: Option<usize>) {
317 self.parser_token = t;
318 }
319
320 /// Remaining length of the bulk argument the parser is currently
321 /// consuming.
322 #[must_use]
323 pub fn rlen(&self) -> u32 {
324 self.rlen
325 }
326
327 /// Set the bulk-argument remaining length.
328 pub fn set_rlen(&mut self, n: u32) {
329 self.rlen = n;
330 }
331
332 /// Remaining unprocessed token count for the current parse.
333 #[must_use]
334 pub fn rntokens(&self) -> u32 {
335 self.rntokens
336 }
337
338 /// Set the remaining-token counter.
339 pub fn set_rntokens(&mut self, n: u32) {
340 self.rntokens = n;
341 }
342
343 /// Total parsed token count for the current message.
344 #[must_use]
345 pub fn ntokens(&self) -> u32 {
346 self.ntokens
347 }
348
349 /// Set the total parsed token count.
350 pub fn set_ntokens(&mut self, n: u32) {
351 self.ntokens = n;
352 }
353
354 /// Number of keys the script (EVAL/EVALSHA) declared.
355 #[must_use]
356 pub fn nkeys(&self) -> u32 {
357 self.nkeys
358 }
359
360 /// Set the script-key count.
361 pub fn set_nkeys(&mut self, n: u32) {
362 self.nkeys = n;
363 }
364
365 /// Storage-command value length.
366 #[must_use]
367 pub fn vlen(&self) -> u32 {
368 self.vlen
369 }
370
371 /// Set the storage-command value length.
372 pub fn set_vlen(&mut self, n: u32) {
373 self.vlen = n;
374 }
375
376 /// Integer value carried by the response (`:n\r\n`).
377 #[must_use]
378 pub fn integer(&self) -> i64 {
379 self.integer
380 }
381
382 /// Set the integer response value.
383 pub fn set_integer(&mut self, v: i64) {
384 self.integer = v;
385 }
386
387 /// Offset of the multi-bulk `END` marker in the response, if any.
388 #[must_use]
389 pub fn end_marker(&self) -> Option<usize> {
390 self.end_marker
391 }
392
393 /// Set the response `END` marker offset.
394 pub fn set_end_marker(&mut self, m: Option<usize>) {
395 self.end_marker = m;
396 }
397
398 /// Start offset of the multi-bulk argument count token.
399 #[must_use]
400 pub fn ntoken_start(&self) -> Option<usize> {
401 self.ntoken_start
402 }
403
404 /// End offset (exclusive) of the multi-bulk argument count token.
405 #[must_use]
406 pub fn ntoken_end(&self) -> Option<usize> {
407 self.ntoken_end
408 }
409
410 /// Set the multi-bulk argument count span.
411 pub fn set_ntoken_span(&mut self, start: Option<usize>, end: Option<usize>) {
412 self.ntoken_start = start;
413 self.ntoken_end = end;
414 }
415
416 /// Fragment id grouping all sub-messages produced from a
417 /// multi-key request.
418 #[must_use]
419 pub fn frag_id(&self) -> u64 {
420 self.frag_id
421 }
422
423 /// Set the fragment id.
424 pub fn set_frag_id(&mut self, id: u64) {
425 self.frag_id = id;
426 }
427
428 /// Message id.
429 ///
430 /// # Examples
431 /// ```
432 /// use dynomite::msg::{Msg, MsgType};
433 /// assert_eq!(Msg::new(99, MsgType::Unknown, true).id(), 99);
434 /// ```
435 #[must_use]
436 pub fn id(&self) -> MsgId {
437 self.id
438 }
439
440 /// Parent id (zero when not a fragment).
441 ///
442 /// # Examples
443 /// ```
444 /// use dynomite::msg::{Msg, MsgType};
445 /// assert_eq!(Msg::new(1, MsgType::Unknown, true).parent_id(), 0);
446 /// ```
447 #[must_use]
448 pub fn parent_id(&self) -> MsgId {
449 self.parent_id
450 }
451
452 /// Set the parent id.
453 ///
454 /// # Examples
455 /// ```
456 /// use dynomite::msg::{Msg, MsgType};
457 /// let mut m = Msg::new(2, MsgType::Unknown, true);
458 /// m.set_parent_id(1);
459 /// assert_eq!(m.parent_id(), 1);
460 /// ```
461 pub fn set_parent_id(&mut self, parent: MsgId) {
462 self.parent_id = parent;
463 }
464
465 /// Message type tag.
466 ///
467 /// # Examples
468 /// ```
469 /// use dynomite::msg::{Msg, MsgType};
470 /// assert_eq!(Msg::new(1, MsgType::ReqMcGet, true).ty(), MsgType::ReqMcGet);
471 /// ```
472 #[must_use]
473 pub fn ty(&self) -> MsgType {
474 self.ty
475 }
476
477 /// Override the message type. The previous value is preserved as
478 /// the original type so query rewriters can recover it.
479 ///
480 /// # Examples
481 /// ```
482 /// use dynomite::msg::{Msg, MsgType};
483 /// let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
484 /// m.set_type(MsgType::ReqRedisSet);
485 /// assert_eq!(m.ty(), MsgType::ReqRedisSet);
486 /// assert_eq!(m.orig_type(), MsgType::ReqRedisGet);
487 /// ```
488 pub fn set_type(&mut self, ty: MsgType) {
489 self.orig_type = self.ty;
490 self.ty = ty;
491 }
492
493 /// Original message type before any rewrite.
494 ///
495 /// # Examples
496 /// ```
497 /// use dynomite::msg::{Msg, MsgType};
498 /// assert_eq!(
499 /// Msg::new(1, MsgType::ReqRedisGet, true).orig_type(),
500 /// MsgType::Unknown,
501 /// );
502 /// ```
503 #[must_use]
504 pub fn orig_type(&self) -> MsgType {
505 self.orig_type
506 }
507
508 /// True for requests.
509 ///
510 /// # Examples
511 /// ```
512 /// use dynomite::msg::{Msg, MsgType};
513 /// assert!(Msg::new(1, MsgType::ReqMcGet, true).is_request());
514 /// assert!(!Msg::new(1, MsgType::RspMcStored, false).is_request());
515 /// ```
516 #[must_use]
517 pub fn is_request(&self) -> bool {
518 self.is_request
519 }
520
521 /// Borrow the underlying mbuf chain.
522 ///
523 /// # Examples
524 /// ```
525 /// use dynomite::msg::{Msg, MsgType};
526 /// let m = Msg::new(1, MsgType::Unknown, true);
527 /// assert!(m.mbufs().is_empty());
528 /// ```
529 #[must_use]
530 pub fn mbufs(&self) -> &MbufQueue {
531 &self.mbufs
532 }
533
534 /// Mutably borrow the mbuf chain.
535 ///
536 /// # Examples
537 /// ```
538 /// use dynomite::io::mbuf::MbufPool;
539 /// use dynomite::msg::{Msg, MsgType};
540 ///
541 /// let pool = MbufPool::default();
542 /// let mut m = Msg::new(1, MsgType::Unknown, true);
543 /// m.mbufs_mut().push_back(pool.get());
544 /// assert_eq!(m.mbufs().len(), 1);
545 /// ```
546 pub fn mbufs_mut(&mut self) -> &mut MbufQueue {
547 &mut self.mbufs
548 }
549
550 /// Cumulative readable byte count of the chain (`mlen`).
551 ///
552 /// # Examples
553 /// ```
554 /// use dynomite::msg::{Msg, MsgType};
555 /// assert_eq!(Msg::new(1, MsgType::Unknown, true).mlen(), 0);
556 /// ```
557 #[must_use]
558 pub fn mlen(&self) -> u32 {
559 self.mlen
560 }
561
562 /// Refresh `mlen` from the current chain.
563 ///
564 /// The parser updates the length as it consumes bytes; callers
565 /// that mutate the chain directly call this to keep the cached
566 /// length consistent with the actual chain content.
567 ///
568 /// # Examples
569 /// ```
570 /// use dynomite::io::mbuf::MbufPool;
571 /// use dynomite::msg::{Msg, MsgType};
572 ///
573 /// let pool = MbufPool::default();
574 /// let mut m = Msg::new(1, MsgType::Unknown, true);
575 /// let mut buf = pool.get();
576 /// buf.recv(b"hi");
577 /// m.mbufs_mut().push_back(buf);
578 /// m.recompute_mlen();
579 /// assert_eq!(m.mlen(), 2);
580 /// ```
581 pub fn recompute_mlen(&mut self) {
582 let total: usize = self.mbufs.iter().map(crate::io::mbuf::Mbuf::len).sum();
583 self.mlen = u32::try_from(total).unwrap_or(u32::MAX);
584 }
585
586 /// Direct setter for `mlen`. Use [`Msg::recompute_mlen`] when the
587 /// chain has been mutated; this entry point exists for parsers
588 /// that adjust the value as they consume bytes.
589 ///
590 /// # Examples
591 /// ```
592 /// use dynomite::msg::{Msg, MsgType};
593 /// let mut m = Msg::new(1, MsgType::Unknown, true);
594 /// m.set_mlen(123);
595 /// assert_eq!(m.mlen(), 123);
596 /// ```
597 pub fn set_mlen(&mut self, mlen: u32) {
598 self.mlen = mlen;
599 }
600
601 /// Last parse outcome.
602 ///
603 /// # Examples
604 /// ```
605 /// use dynomite::msg::{Msg, MsgParseResult, MsgType};
606 /// assert_eq!(
607 /// Msg::new(1, MsgType::Unknown, true).parse_result(),
608 /// MsgParseResult::Ok,
609 /// );
610 /// ```
611 #[must_use]
612 pub fn parse_result(&self) -> MsgParseResult {
613 self.parse_result
614 }
615
616 /// Set the parse outcome.
617 ///
618 /// # Examples
619 /// ```
620 /// use dynomite::msg::{Msg, MsgParseResult, MsgType};
621 /// let mut m = Msg::new(1, MsgType::Unknown, true);
622 /// m.set_parse_result(MsgParseResult::Again);
623 /// assert_eq!(m.parse_result(), MsgParseResult::Again);
624 /// ```
625 pub fn set_parse_result(&mut self, r: MsgParseResult) {
626 self.parse_result = r;
627 }
628
629 /// Current DNODE parser state.
630 ///
631 /// # Examples
632 /// ```
633 /// use dynomite::msg::{Msg, MsgType};
634 /// use dynomite::proto::dnode::DynParseState;
635 /// assert_eq!(
636 /// Msg::new(1, MsgType::Unknown, true).dyn_parse_state(),
637 /// DynParseState::Start,
638 /// );
639 /// ```
640 #[must_use]
641 pub fn dyn_parse_state(&self) -> DynParseState {
642 self.dyn_parse_state
643 }
644
645 /// Set the DNODE parser state.
646 pub fn set_dyn_parse_state(&mut self, state: DynParseState) {
647 self.dyn_parse_state = state;
648 }
649
650 /// Borrow the parsed DNODE header, if any.
651 ///
652 /// # Examples
653 /// ```
654 /// use dynomite::msg::{Msg, MsgType};
655 /// assert!(Msg::new(1, MsgType::Unknown, true).dmsg().is_none());
656 /// ```
657 #[must_use]
658 pub fn dmsg(&self) -> Option<&Dmsg> {
659 self.dmsg.as_ref()
660 }
661
662 /// Mutably borrow the parsed DNODE header.
663 pub fn dmsg_mut(&mut self) -> Option<&mut Dmsg> {
664 self.dmsg.as_mut()
665 }
666
667 /// Attach a parsed DNODE header.
668 pub fn set_dmsg(&mut self, dmsg: Dmsg) {
669 self.dmsg = Some(dmsg);
670 }
671
672 /// Routing override.
673 #[must_use]
674 pub fn routing(&self) -> MsgRouting {
675 self.routing
676 }
677
678 /// Set the routing override.
679 pub fn set_routing(&mut self, routing: MsgRouting) {
680 self.routing = routing;
681 }
682
683 /// Consistency level for this message.
684 #[must_use]
685 pub fn consistency(&self) -> super::ConsistencyLevel {
686 self.consistency
687 }
688
689 /// Set the consistency level.
690 pub fn set_consistency(&mut self, level: super::ConsistencyLevel) {
691 self.consistency = level;
692 }
693
694 /// Microsecond timestamp recorded at request creation.
695 #[must_use]
696 pub fn timestamp_us(&self) -> u64 {
697 self.timestamp_us
698 }
699
700 /// Update the request timestamp.
701 pub fn set_timestamp_us(&mut self, ts: u64) {
702 self.timestamp_us = ts;
703 }
704
705 /// Datastore-level error code (`errno`-shaped).
706 #[must_use]
707 pub fn error_code(&self) -> i32 {
708 self.error_code
709 }
710
711 /// Set the datastore error code.
712 pub fn set_error_code(&mut self, e: i32) {
713 self.error_code = e;
714 }
715
716 /// Dynomite-level error code.
717 #[must_use]
718 pub fn dyn_error_code(&self) -> super::DynErrorCode {
719 self.dyn_error_code
720 }
721
722 /// Set the Dynomite error code.
723 pub fn set_dyn_error_code(&mut self, e: super::DynErrorCode) {
724 self.dyn_error_code = e;
725 }
726
727 /// Number of replies the request is still waiting on.
728 #[must_use]
729 pub fn awaiting_rsps(&self) -> u32 {
730 self.awaiting_rsps
731 }
732
733 /// Increment `awaiting_rsps`.
734 pub fn incr_awaiting_rsps(&mut self) {
735 self.awaiting_rsps = self.awaiting_rsps.saturating_add(1);
736 }
737
738 /// Decrement `awaiting_rsps`.
739 pub fn decr_awaiting_rsps(&mut self) {
740 self.awaiting_rsps = self.awaiting_rsps.saturating_sub(1);
741 }
742
743 /// Set `awaiting_rsps` directly. Used by the response manager
744 /// initialiser to seed the per-DC count.
745 pub fn set_awaiting_rsps(&mut self, n: u32) {
746 self.awaiting_rsps = n;
747 }
748
749 /// Borrow the fragment-id list.
750 #[must_use]
751 pub fn fragment_ids(&self) -> &[MsgId] {
752 &self.fragment_ids
753 }
754
755 /// Append `id` to the fragment-id list.
756 pub fn push_fragment_id(&mut self, id: MsgId) {
757 self.fragment_ids.push(id);
758 }
759
760 /// Borrow the response-id list.
761 #[must_use]
762 pub fn response_ids(&self) -> &[MsgId] {
763 &self.response_ids
764 }
765
766 /// Append `id` to the response-id list.
767 pub fn push_response_id(&mut self, id: MsgId) {
768 self.response_ids.push(id);
769 }
770
771 /// Currently-selected response id.
772 #[must_use]
773 pub fn selected_rsp(&self) -> Option<MsgId> {
774 self.selected_rsp
775 }
776
777 /// Set the currently-selected response id.
778 pub fn set_selected_rsp(&mut self, id: Option<MsgId>) {
779 self.selected_rsp = id;
780 }
781
782 /// Owner connection id (placeholder until Stage 9).
783 #[must_use]
784 pub fn owner(&self) -> Option<ConnId> {
785 self.owner
786 }
787
788 /// Set the owner connection id.
789 pub fn set_owner(&mut self, owner: Option<ConnId>) {
790 self.owner = owner;
791 }
792
793 /// Borrow the lifecycle flags.
794 #[must_use]
795 pub fn flags(&self) -> &MsgFlags {
796 &self.flags
797 }
798
799 /// Mutably borrow the lifecycle flags.
800 pub fn flags_mut(&mut self) -> &mut MsgFlags {
801 &mut self.flags
802 }
803
804 /// Set the `swallow` flag.
805 pub fn set_swallow(&mut self, on: bool) {
806 self.flags.swallow = on;
807 }
808
809 /// Set the `done` flag.
810 pub fn set_done(&mut self, on: bool) {
811 self.flags.done = on;
812 }
813
814 /// Set the `is_error` flag.
815 pub fn set_is_error(&mut self, on: bool) {
816 self.flags.is_error = on;
817 }
818
819 /// Borrow the local-DC response manager.
820 #[must_use]
821 pub fn rspmgr(&self) -> Option<&ResponseMgr> {
822 self.rspmgr.as_ref()
823 }
824
825 /// Mutably borrow the local-DC response manager.
826 pub fn rspmgr_mut(&mut self) -> Option<&mut ResponseMgr> {
827 self.rspmgr.as_mut()
828 }
829
830 /// Install a fresh response manager for the local DC.
831 pub fn set_rspmgr(&mut self, mgr: ResponseMgr) {
832 self.flags.rspmgrs_inited = true;
833 self.set_awaiting_rsps(u32::from(mgr.max_responses()));
834 self.rspmgr = Some(mgr);
835 }
836
837 /// Borrow the per-remote-DC response managers.
838 #[must_use]
839 pub fn additional_rspmgrs(&self) -> &[ResponseMgr] {
840 &self.additional_rspmgrs
841 }
842
843 /// Mutably borrow the per-remote-DC response managers.
844 pub fn additional_rspmgrs_mut(&mut self) -> &mut Vec<ResponseMgr> {
845 &mut self.additional_rspmgrs
846 }
847}
848
849#[cfg(test)]
850mod tests {
851 use super::*;
852
853 #[test]
854 fn defaults_match_reference() {
855 let m = Msg::new(1, MsgType::ReqRedisGet, true);
856 assert!(m.flags().expect_datastore_reply);
857 assert!(m.flags().is_read);
858 assert!(m.flags().rewrite_with_ts_possible);
859 assert!(!m.flags().is_error);
860 assert!(!m.flags().rspmgrs_inited);
861 assert_eq!(m.consistency(), super::super::ConsistencyLevel::DcOne);
862 assert_eq!(m.routing(), MsgRouting::Normal);
863 assert_eq!(m.dyn_parse_state(), DynParseState::Start);
864 }
865
866 #[test]
867 fn set_type_preserves_original() {
868 let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
869 assert_eq!(m.orig_type(), MsgType::Unknown);
870 m.set_type(MsgType::ReqRedisSet);
871 assert_eq!(m.ty(), MsgType::ReqRedisSet);
872 assert_eq!(m.orig_type(), MsgType::ReqRedisGet);
873 }
874
875 #[test]
876 fn awaiting_saturates() {
877 let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
878 m.decr_awaiting_rsps();
879 assert_eq!(m.awaiting_rsps(), 0);
880 m.incr_awaiting_rsps();
881 m.incr_awaiting_rsps();
882 assert_eq!(m.awaiting_rsps(), 2);
883 m.decr_awaiting_rsps();
884 assert_eq!(m.awaiting_rsps(), 1);
885 }
886}