use crate::core::types::MsgId;
use super::keypos::{ArgPos, KeyPos};
use super::msg_type::MsgType;
use super::response_mgr::ResponseMgr;
use crate::io::mbuf::MbufQueue;
use crate::proto::dnode::{Dmsg, DynParseState};
pub type ConnId = u64;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
pub enum MsgParseResult {
#[default]
Ok,
Error,
Repair,
Fragment,
Again,
Noop,
DynoConfig,
OomError,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
#[repr(u8)]
pub enum MsgRouting {
#[default]
Normal = 0,
LocalNodeOnly = 1,
TokenOwnerLocalRackOnly = 2,
AllNodesLocalRackOnly = 3,
AllNodesAllRacksAllDcs = 4,
}
impl MsgRouting {
#[must_use]
pub fn name(self) -> &'static str {
match self {
MsgRouting::Normal => "ROUTING_NORMAL",
MsgRouting::LocalNodeOnly => "ROUTING_LOCAL_NODE_ONLY",
MsgRouting::TokenOwnerLocalRackOnly => "ROUTING_TOKEN_OWNER_LOCAL_RACK_ONLY",
MsgRouting::AllNodesLocalRackOnly => "ROUTING_ALL_NODES_LOCAL_RACK_ONLY",
MsgRouting::AllNodesAllRacksAllDcs => "ROUTING_ALL_NODES_ALL_RACKS_ALL_DCS",
}
}
}
#[allow(clippy::struct_excessive_bools)]
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct MsgFlags {
pub is_error: bool,
pub is_ferror: bool,
pub quit: bool,
pub expect_datastore_reply: bool,
pub done: bool,
pub fdone: bool,
pub swallow: bool,
pub dnode_header_prepended: bool,
pub rsp_sent: bool,
pub is_read: bool,
pub needs_repair: bool,
pub rewrite_with_ts_possible: bool,
pub rspmgrs_inited: bool,
}
impl MsgFlags {
#[must_use]
fn default_for_msg() -> Self {
Self {
expect_datastore_reply: true,
is_read: true,
rewrite_with_ts_possible: true,
..Self::default()
}
}
}
#[derive(Debug)]
pub struct Msg {
id: MsgId,
parent_id: MsgId,
ty: MsgType,
orig_type: MsgType,
is_request: bool,
mbufs: MbufQueue,
mlen: u32,
parse_result: MsgParseResult,
dyn_parse_state: DynParseState,
dmsg: Option<Dmsg>,
routing: MsgRouting,
consistency: super::ConsistencyLevel,
timestamp_us: u64,
error_code: i32,
dyn_error_code: super::DynErrorCode,
awaiting_rsps: u32,
fragment_ids: Vec<MsgId>,
response_ids: Vec<MsgId>,
selected_rsp: Option<MsgId>,
owner: Option<ConnId>,
flags: MsgFlags,
rspmgr: Option<ResponseMgr>,
additional_rspmgrs: Vec<ResponseMgr>,
parser_state: u32,
parser_pos: usize,
parser_token: Option<usize>,
rlen: u32,
rntokens: u32,
ntokens: u32,
nkeys: u32,
vlen: u32,
integer: i64,
keys: Vec<KeyPos>,
args: Vec<ArgPos>,
end_marker: Option<usize>,
ntoken_start: Option<usize>,
ntoken_end: Option<usize>,
frag_id: u64,
}
impl Msg {
#[must_use]
pub fn new(id: MsgId, ty: MsgType, is_request: bool) -> Self {
Self {
id,
parent_id: 0,
ty,
orig_type: MsgType::Unknown,
is_request,
mbufs: MbufQueue::new(),
mlen: 0,
parse_result: MsgParseResult::default(),
dyn_parse_state: DynParseState::Start,
dmsg: None,
routing: MsgRouting::Normal,
consistency: super::ConsistencyLevel::DcOne,
timestamp_us: 0,
error_code: 0,
dyn_error_code: super::DynErrorCode::Ok,
awaiting_rsps: 0,
fragment_ids: Vec::new(),
response_ids: Vec::new(),
selected_rsp: None,
owner: None,
flags: MsgFlags::default_for_msg(),
rspmgr: None,
additional_rspmgrs: Vec::new(),
parser_state: 0,
parser_pos: 0,
parser_token: None,
rlen: 0,
rntokens: 0,
ntokens: 0,
nkeys: 0,
vlen: 0,
integer: 0,
keys: Vec::new(),
args: Vec::new(),
end_marker: None,
ntoken_start: None,
ntoken_end: None,
frag_id: 0,
}
}
#[must_use]
pub fn keys(&self) -> &[KeyPos] {
&self.keys
}
pub fn keys_mut(&mut self) -> &mut Vec<KeyPos> {
&mut self.keys
}
pub fn push_key(&mut self, k: KeyPos) {
self.keys.push(k);
}
#[must_use]
pub fn args(&self) -> &[ArgPos] {
&self.args
}
pub fn args_mut(&mut self) -> &mut Vec<ArgPos> {
&mut self.args
}
pub fn push_arg(&mut self, a: ArgPos) {
self.args.push(a);
}
#[must_use]
pub fn parser_state(&self) -> u32 {
self.parser_state
}
pub fn set_parser_state(&mut self, s: u32) {
self.parser_state = s;
}
#[must_use]
pub fn parser_pos(&self) -> usize {
self.parser_pos
}
pub fn set_parser_pos(&mut self, p: usize) {
self.parser_pos = p;
}
#[must_use]
pub fn parser_token(&self) -> Option<usize> {
self.parser_token
}
pub fn set_parser_token(&mut self, t: Option<usize>) {
self.parser_token = t;
}
#[must_use]
pub fn rlen(&self) -> u32 {
self.rlen
}
pub fn set_rlen(&mut self, n: u32) {
self.rlen = n;
}
#[must_use]
pub fn rntokens(&self) -> u32 {
self.rntokens
}
pub fn set_rntokens(&mut self, n: u32) {
self.rntokens = n;
}
#[must_use]
pub fn ntokens(&self) -> u32 {
self.ntokens
}
pub fn set_ntokens(&mut self, n: u32) {
self.ntokens = n;
}
#[must_use]
pub fn nkeys(&self) -> u32 {
self.nkeys
}
pub fn set_nkeys(&mut self, n: u32) {
self.nkeys = n;
}
#[must_use]
pub fn vlen(&self) -> u32 {
self.vlen
}
pub fn set_vlen(&mut self, n: u32) {
self.vlen = n;
}
#[must_use]
pub fn integer(&self) -> i64 {
self.integer
}
pub fn set_integer(&mut self, v: i64) {
self.integer = v;
}
#[must_use]
pub fn end_marker(&self) -> Option<usize> {
self.end_marker
}
pub fn set_end_marker(&mut self, m: Option<usize>) {
self.end_marker = m;
}
#[must_use]
pub fn ntoken_start(&self) -> Option<usize> {
self.ntoken_start
}
#[must_use]
pub fn ntoken_end(&self) -> Option<usize> {
self.ntoken_end
}
pub fn set_ntoken_span(&mut self, start: Option<usize>, end: Option<usize>) {
self.ntoken_start = start;
self.ntoken_end = end;
}
#[must_use]
pub fn frag_id(&self) -> u64 {
self.frag_id
}
pub fn set_frag_id(&mut self, id: u64) {
self.frag_id = id;
}
#[must_use]
pub fn id(&self) -> MsgId {
self.id
}
#[must_use]
pub fn parent_id(&self) -> MsgId {
self.parent_id
}
pub fn set_parent_id(&mut self, parent: MsgId) {
self.parent_id = parent;
}
#[must_use]
pub fn ty(&self) -> MsgType {
self.ty
}
pub fn set_type(&mut self, ty: MsgType) {
self.orig_type = self.ty;
self.ty = ty;
}
#[must_use]
pub fn orig_type(&self) -> MsgType {
self.orig_type
}
#[must_use]
pub fn is_request(&self) -> bool {
self.is_request
}
#[must_use]
pub fn mbufs(&self) -> &MbufQueue {
&self.mbufs
}
pub fn mbufs_mut(&mut self) -> &mut MbufQueue {
&mut self.mbufs
}
#[must_use]
pub fn mlen(&self) -> u32 {
self.mlen
}
pub fn recompute_mlen(&mut self) {
let total: usize = self.mbufs.iter().map(crate::io::mbuf::Mbuf::len).sum();
self.mlen = u32::try_from(total).unwrap_or(u32::MAX);
}
pub fn set_mlen(&mut self, mlen: u32) {
self.mlen = mlen;
}
#[must_use]
pub fn parse_result(&self) -> MsgParseResult {
self.parse_result
}
pub fn set_parse_result(&mut self, r: MsgParseResult) {
self.parse_result = r;
}
#[must_use]
pub fn dyn_parse_state(&self) -> DynParseState {
self.dyn_parse_state
}
pub fn set_dyn_parse_state(&mut self, state: DynParseState) {
self.dyn_parse_state = state;
}
#[must_use]
pub fn dmsg(&self) -> Option<&Dmsg> {
self.dmsg.as_ref()
}
pub fn dmsg_mut(&mut self) -> Option<&mut Dmsg> {
self.dmsg.as_mut()
}
pub fn set_dmsg(&mut self, dmsg: Dmsg) {
self.dmsg = Some(dmsg);
}
#[must_use]
pub fn routing(&self) -> MsgRouting {
self.routing
}
pub fn set_routing(&mut self, routing: MsgRouting) {
self.routing = routing;
}
#[must_use]
pub fn consistency(&self) -> super::ConsistencyLevel {
self.consistency
}
pub fn set_consistency(&mut self, level: super::ConsistencyLevel) {
self.consistency = level;
}
#[must_use]
pub fn timestamp_us(&self) -> u64 {
self.timestamp_us
}
pub fn set_timestamp_us(&mut self, ts: u64) {
self.timestamp_us = ts;
}
#[must_use]
pub fn error_code(&self) -> i32 {
self.error_code
}
pub fn set_error_code(&mut self, e: i32) {
self.error_code = e;
}
#[must_use]
pub fn dyn_error_code(&self) -> super::DynErrorCode {
self.dyn_error_code
}
pub fn set_dyn_error_code(&mut self, e: super::DynErrorCode) {
self.dyn_error_code = e;
}
#[must_use]
pub fn awaiting_rsps(&self) -> u32 {
self.awaiting_rsps
}
pub fn incr_awaiting_rsps(&mut self) {
self.awaiting_rsps = self.awaiting_rsps.saturating_add(1);
}
pub fn decr_awaiting_rsps(&mut self) {
self.awaiting_rsps = self.awaiting_rsps.saturating_sub(1);
}
pub fn set_awaiting_rsps(&mut self, n: u32) {
self.awaiting_rsps = n;
}
#[must_use]
pub fn fragment_ids(&self) -> &[MsgId] {
&self.fragment_ids
}
pub fn push_fragment_id(&mut self, id: MsgId) {
self.fragment_ids.push(id);
}
#[must_use]
pub fn response_ids(&self) -> &[MsgId] {
&self.response_ids
}
pub fn push_response_id(&mut self, id: MsgId) {
self.response_ids.push(id);
}
#[must_use]
pub fn selected_rsp(&self) -> Option<MsgId> {
self.selected_rsp
}
pub fn set_selected_rsp(&mut self, id: Option<MsgId>) {
self.selected_rsp = id;
}
#[must_use]
pub fn owner(&self) -> Option<ConnId> {
self.owner
}
pub fn set_owner(&mut self, owner: Option<ConnId>) {
self.owner = owner;
}
#[must_use]
pub fn flags(&self) -> &MsgFlags {
&self.flags
}
pub fn flags_mut(&mut self) -> &mut MsgFlags {
&mut self.flags
}
pub fn set_swallow(&mut self, on: bool) {
self.flags.swallow = on;
}
pub fn set_done(&mut self, on: bool) {
self.flags.done = on;
}
pub fn set_is_error(&mut self, on: bool) {
self.flags.is_error = on;
}
#[must_use]
pub fn rspmgr(&self) -> Option<&ResponseMgr> {
self.rspmgr.as_ref()
}
pub fn rspmgr_mut(&mut self) -> Option<&mut ResponseMgr> {
self.rspmgr.as_mut()
}
pub fn set_rspmgr(&mut self, mgr: ResponseMgr) {
self.flags.rspmgrs_inited = true;
self.set_awaiting_rsps(u32::from(mgr.max_responses()));
self.rspmgr = Some(mgr);
}
#[must_use]
pub fn additional_rspmgrs(&self) -> &[ResponseMgr] {
&self.additional_rspmgrs
}
pub fn additional_rspmgrs_mut(&mut self) -> &mut Vec<ResponseMgr> {
&mut self.additional_rspmgrs
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn defaults_match_reference() {
let m = Msg::new(1, MsgType::ReqRedisGet, true);
assert!(m.flags().expect_datastore_reply);
assert!(m.flags().is_read);
assert!(m.flags().rewrite_with_ts_possible);
assert!(!m.flags().is_error);
assert!(!m.flags().rspmgrs_inited);
assert_eq!(m.consistency(), super::super::ConsistencyLevel::DcOne);
assert_eq!(m.routing(), MsgRouting::Normal);
assert_eq!(m.dyn_parse_state(), DynParseState::Start);
}
#[test]
fn set_type_preserves_original() {
let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
assert_eq!(m.orig_type(), MsgType::Unknown);
m.set_type(MsgType::ReqRedisSet);
assert_eq!(m.ty(), MsgType::ReqRedisSet);
assert_eq!(m.orig_type(), MsgType::ReqRedisGet);
}
#[test]
fn awaiting_saturates() {
let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
m.decr_awaiting_rsps();
assert_eq!(m.awaiting_rsps(), 0);
m.incr_awaiting_rsps();
m.incr_awaiting_rsps();
assert_eq!(m.awaiting_rsps(), 2);
m.decr_awaiting_rsps();
assert_eq!(m.awaiting_rsps(), 1);
}
}