selium_log/message/
mod.rs

1//! Message envelope/frame for a set of records.
2
3mod headers;
4mod slice;
5
6use bytes::{BufMut, Bytes};
7use crc32c::crc32c;
8pub use headers::Headers;
9pub use slice::MessageSlice;
10use std::mem::size_of;
11
12/// The byte length of the [Headers] length marker
13pub const LEN_MARKER_SIZE: usize = size_of::<u64>();
14
15/// The byte length of the CRC.
16pub const CRC_SIZE: usize = size_of::<u32>();
17
18/// The combined byte length of the message headers.
19pub const HEADERS_SIZE: usize =
20    size_of::<u64>() + size_of::<u32>() + size_of::<u32>() + size_of::<u64>();
21
22/// The Message frame contains information required to parse the message, a calculated CRC used to
23/// verify message integrity, and the encoded records.
24#[derive(Debug, Clone, PartialEq)]
25pub struct Message {
26    headers: Headers,
27    records: Bytes,
28    // TODO: implement CRC check after replication is implemented.
29    _crc: u32,
30}
31
32impl Message {
33    /// Constructs a Message instance with the provided [Headers], records
34    /// batch and CRC.
35    pub fn new(headers: Headers, records: &[u8], crc: u32) -> Self {
36        let records = Bytes::copy_from_slice(records);
37
38        Self {
39            headers,
40            records,
41            _crc: crc,
42        }
43    }
44
45    /// Shorthand method for constructing a Message instance with a batch
46    /// size of 1.
47    pub fn single(records: &[u8], version: u32) -> Self {
48        let headers = Headers::new(records.len(), 1, version);
49        Self::new(headers, records, 0)
50    }
51
52    /// Shorthand method for constructing a Message instance with a provided
53    /// batch size.
54    pub fn batch(records: &[u8], batch_size: u32, version: u32) -> Self {
55        let headers = Headers::new(records.len(), batch_size, version);
56        Self::new(headers, records, 0)
57    }
58
59    /// Encodes this Message instance into the provided buffer.
60    pub fn encode(&self, buffer: &mut Vec<u8>) {
61        self.headers.encode(buffer);
62        buffer.put_slice(&self.records);
63        let crc = crc32c(buffer);
64        buffer.put_u32(crc);
65    }
66
67    /// The message headers containing information about the records batch.
68    pub fn headers(&self) -> &Headers {
69        &self.headers
70    }
71
72    /// The encoded records batch.
73    pub fn records(&self) -> &[u8] {
74        &self.records
75    }
76}