selium_log/message/
mod.rs1mod headers;
4mod slice;
5
6use bytes::{BufMut, Bytes};
7use crc32c::crc32c;
8pub use headers::Headers;
9pub use slice::MessageSlice;
10use std::mem::size_of;
11
12pub const LEN_MARKER_SIZE: usize = size_of::<u64>();
14
15pub const CRC_SIZE: usize = size_of::<u32>();
17
18pub const HEADERS_SIZE: usize =
20 size_of::<u64>() + size_of::<u32>() + size_of::<u32>() + size_of::<u64>();
21
22#[derive(Debug, Clone, PartialEq)]
25pub struct Message {
26 headers: Headers,
27 records: Bytes,
28 _crc: u32,
30}
31
32impl Message {
33 pub fn new(headers: Headers, records: &[u8], crc: u32) -> Self {
36 let records = Bytes::copy_from_slice(records);
37
38 Self {
39 headers,
40 records,
41 _crc: crc,
42 }
43 }
44
45 pub fn single(records: &[u8], version: u32) -> Self {
48 let headers = Headers::new(records.len(), 1, version);
49 Self::new(headers, records, 0)
50 }
51
52 pub fn batch(records: &[u8], batch_size: u32, version: u32) -> Self {
55 let headers = Headers::new(records.len(), batch_size, version);
56 Self::new(headers, records, 0)
57 }
58
59 pub fn encode(&self, buffer: &mut Vec<u8>) {
61 self.headers.encode(buffer);
62 buffer.put_slice(&self.records);
63 let crc = crc32c(buffer);
64 buffer.put_u32(crc);
65 }
66
67 pub fn headers(&self) -> &Headers {
69 &self.headers
70 }
71
72 pub fn records(&self) -> &[u8] {
74 &self.records
75 }
76}