v_queue 0.3.0

simple file based queue
Documentation
pub const HEADER_SIZE: usize = 25;
pub const MAGIC_MARKER: u32 = 0xEEEF_FEEE;
pub const MAGIC_MARKER_BYTES: [u8; 4] = [0xEE, 0xFE, 0xEF, 0xEE];

// Distinct marker for v3 records. It differs from the v2 marker in the first
// byte so the v2 resync scanner cannot accidentally match a v3 record.
pub const MAGIC_MARKER_V3: u32 = 0xEEEF_FE33;
pub const MAGIC_MARKER_V3_BYTES: [u8; 4] = MAGIC_MARKER_V3.to_ne_bytes();

// High bit of the msg_type byte marks a record whose body is compressed.
// v2 records only ever store 'S'/'O' there, so the bit is always clear for
// them and old code keeps reading them as before.
pub const MSG_COMPRESSED_FLAG: u8 = 0x80;

// On-disk format of a queue part. v2 is the pre-0.3.0 layout (no per-part
// dictionary, no compression). v3 adds optional per-message Zstd compression
// with a per-part dictionary and uses MAGIC_MARKER_V3.
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum FormatVersion {
    V2,
    V3,
}

#[derive(PartialEq, Debug)]
pub enum ErrorQueue {
    NotReady = -911,
    AlreadyOpen = -8,
    FailWrite = -7,
    InvalidChecksum = -6,
    FailReadTailMessage = -5,
    FailOpen = -4,
    FailRead = -3,
    NotFound = -2,
    Other = -1,
}

#[derive(Debug, PartialEq, Copy, Clone)]
pub enum Mode {
    Read = 0,
    ReadWrite = 1,
    Default = 2,
}

#[derive(PartialEq, Debug)]
#[repr(u8)]
pub enum MsgType {
    String = b'S',
    Object = b'O',
}

impl From<u8> for MsgType {
    fn from(t: u8) -> Self {
        if t == b'O' {
            MsgType::Object
        } else {
            MsgType::String
        }
    }
}

impl MsgType {
    fn as_u8(&self) -> u8 {
        if *self == MsgType::Object {
            b'O'
        } else {
            b'S'
        }
    }
}

impl ErrorQueue {
    pub fn as_str(&self) -> &'static str {
        match *self {
            ErrorQueue::NotFound => "not found",
            ErrorQueue::Other => "other error",
            ErrorQueue::AlreadyOpen => "already open",
            ErrorQueue::FailOpen => "fail open",
            ErrorQueue::FailRead => "fail read",
            ErrorQueue::FailWrite => "fail write",
            ErrorQueue::NotReady => "not ready",
            ErrorQueue::FailReadTailMessage => "fail read tail message",
            ErrorQueue::InvalidChecksum => "invalid checksum",
        }
    }
}

#[derive(Debug)]
pub struct Header {
    pub start_pos: u64,
    pub msg_length: u32,
    pub magic_marker: u32,
    pub count_pushed: u32,
    pub crc: u32,
    pub msg_type: MsgType,
    // True when the body on disk is Zstd-compressed (v3 only). The flag is
    // carried in the high bit of the msg_type byte.
    pub compressed: bool,
}

impl Header {
    pub fn create_from_buf(buf: &[u8]) -> Self {
        let type_byte = buf[20];
        Header {
            start_pos: u64::from_ne_bytes([buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7]]),
            msg_length: u32::from_ne_bytes([buf[8], buf[9], buf[10], buf[11]]),
            magic_marker: u32::from_ne_bytes([buf[12], buf[13], buf[14], buf[15]]),
            count_pushed: u32::from_ne_bytes([buf[16], buf[17], buf[18], buf[19]]),
            msg_type: MsgType::from(type_byte & !MSG_COMPRESSED_FLAG),
            compressed: (type_byte & MSG_COMPRESSED_FLAG) != 0,
            crc: u32::from_ne_bytes([buf[21], buf[22], buf[23], buf[24]]),
        }
    }

    pub fn to_buf(&self, buf: &mut [u8; HEADER_SIZE]) {
        buf[0..8].clone_from_slice(&u64::to_ne_bytes(self.start_pos));
        buf[8..12].clone_from_slice(&u32::to_ne_bytes(self.msg_length));
        buf[12..16].clone_from_slice(&u32::to_ne_bytes(self.magic_marker));
        buf[16..20].clone_from_slice(&u32::to_ne_bytes(self.count_pushed));
        let mut type_byte = self.msg_type.as_u8();
        if self.compressed {
            type_byte |= MSG_COMPRESSED_FLAG;
        }
        buf[20] = type_byte;
        // crc placeholder, filled in by the writer after hashing
        buf[21] = 0;
        buf[22] = 0;
        buf[23] = 0;
        buf[24] = 0;
    }
}