kekbit_core/
header.rs

1//!Handles metadata associated with a channel.
2use crate::api::ChannelError;
3use crate::api::ChannelError::{IncompatibleVersion, InvalidCapacity, InvalidMaxMessageLength, InvalidSignature};
4use crate::tick::TickUnit;
5use crate::utils::{align, is_aligned, REC_HEADER_LEN};
6use crate::version::Version;
7use std::cmp::max;
8use std::cmp::min;
9
10const MIN_CAPACITY: u32 = 1024 * 16;
11const HEADER_LEN: usize = 128;
12const SIGNATURE: u64 = 0x2A54_4942_4B45_4B2A; //"*KEKBIT*" as bytes as u64
13
14#[inline]
15const fn compute_max_msg_len(capacity: u32) -> u32 {
16    //if you reduce MIN_CAPACITY this may underflow!
17    (capacity >> 7) - (REC_HEADER_LEN as u32)
18}
19
20/// Defines and validates the metadata associated with a channel.
21#[derive(PartialEq, Eq, Debug)]
22pub struct Header {
23    writer_id: u64,
24    channel_id: u64,
25    capacity: u32,
26    max_msg_len: u32,
27    timeout: u64,
28    creation_time: u64,
29    tick_unit: TickUnit,
30    version: Version,
31}
32
33#[allow(clippy::len_without_is_empty)]
34impl Header {
35    /// Defines a new channel header.
36    ///
37    /// Return a struct that contains all the metadata required to be associated with a new channel.
38    ///
39    /// # Arguments
40    ///
41    /// * `writer_id` - Channel's writer identifier
42    /// * `channel_id` - Channel's identifier
43    /// * `capacity_hint` - Hint for the size of the channel - the maximum amount of data that can be wrote into the channel.
44    ///                  Usually a successfully created channel will have a size very close to this hint, probably a little larger.
45    /// * `max_msg_len_hint` - Hint for the maximum size of a message wrote into the channel. This cannot be larger than a certain fraction.
46    ///        of the channel's capacity(1/128th), so the new created channel may have max message length value smaller than this hint.
47    /// * `timeout` - Specifies the write inactivity time interval after each the reader will consider the channel abandoned by the writer.
48    /// * `tick_unit` - Time unit used by the timeout and creation time attributes.        
49    ///
50    /// # Example
51    ///
52    /// ```
53    /// use kekbit_core::tick::TickUnit::Nanos;
54    /// use kekbit_core::header::*;
55    ///     
56    /// let producer_id: u64 = 111;
57    /// let channel_id: u64 = 101;
58    /// let capacity: u32 = 10_001;
59    /// let max_msg_len: u32 = 100;
60    /// let timeout: u64 = 10_000;
61    /// let tick_unit = Nanos;
62    /// let header = Header::new(channel_id, producer_id, capacity, max_msg_len, timeout, tick_unit);
63    /// println!("{:?}", &header);
64    /// ````
65    ///
66    ///
67    #[inline]
68    pub fn new(
69        writer_id: u64,
70        channel_id: u64,
71        capacity_hint: u32,
72        max_msg_len_hint: u32,
73        timeout: u64,
74        tick_unit: TickUnit,
75    ) -> Header {
76        let capacity = max(MIN_CAPACITY, align(capacity_hint));
77        let max_msg_len = align(min(max_msg_len_hint + REC_HEADER_LEN, compute_max_msg_len(capacity)) as u32);
78        let creation_time = tick_unit.nix_time();
79        Header {
80            writer_id,
81            channel_id,
82            capacity,
83            max_msg_len,
84            timeout,
85            creation_time,
86            tick_unit,
87            version: Version::latest(),
88        }
89    }
90    ///Reads and `validates` the metadata from an existing memory mapped channel.
91    ///
92    ///Returns the metadata associated with the channel.
93    ///
94    /// # Arguments
95    ///
96    /// * `header` - Reference to a  byte array which should contain metadata associated with a given channel.
97    ///              Usually points at the beginning of a memory mapped file used as storage for a kekbit channel.
98    ///
99    /// # Errors
100    ///     
101    /// An error will occur if data is corrupted or points to an incompatible version of kekbit channel.
102    ///
103    /// # Example
104    ///
105    ///```
106    /// use memmap::MmapOptions;
107    /// use std::fs::OpenOptions;
108    ///
109    /// # use kekbit_core::tick::TickUnit::Nanos;
110    /// # use kekbit_core::header::Header;
111    /// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
112    /// use kekbit_core::shm::*;
113    /// # const FOREVER: u64 = 99_999_999_999;
114    /// let writer_id = 1850;
115    /// let channel_id = 4242;
116    /// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
117    /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
118    /// let dir_path = test_tmp_dir.path();
119    ///  # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
120    ///
121    /// let kek_file_name = storage_path(dir_path, channel_id);
122    /// let kek_file = OpenOptions::new()
123    ///  .write(true)
124    ///  .read(true)
125    ///  .open(&kek_file_name).unwrap();
126    ///  let mut mmap = unsafe { MmapOptions::new().map_mut(&kek_file) }.unwrap();
127    ///  let buf = &mut mmap[..];
128    ///  let header = Header::read(buf).unwrap();
129    ///  println!("{:?}", &header);
130    ///  ```
131    ///    
132    pub fn read(header: &[u8]) -> Result<Header, ChannelError> {
133        assert!(header.len() >= HEADER_LEN);
134        let mut offset = 0;
135        let signature = Header::read_u64(header, offset);
136        if signature != SIGNATURE {
137            return Err(InvalidSignature {
138                expected: SIGNATURE,
139                actual: signature,
140            });
141        }
142        offset += 8;
143        let version: Version = Header::read_u64(header, 8).into();
144        let latest = Version::latest();
145        if !latest.is_compatible(version) {
146            return Err(IncompatibleVersion {
147                expected: latest.into(),
148                actual: version.into(),
149            });
150        }
151        offset += 8;
152        let writer_id = Header::read_u64(header, offset);
153        offset += 8;
154        let channel_id = Header::read_u64(header, offset);
155        offset += 8;
156        let capacity = Header::read_u32(header, offset);
157        if capacity < MIN_CAPACITY {
158            return Err(InvalidCapacity {
159                capacity,
160                msg: "Capacity below minimum allowed of 10KB",
161            });
162        }
163        if !is_aligned(MIN_CAPACITY) {
164            return Err(InvalidCapacity {
165                capacity,
166                msg: "Capacity is not 8 bytes aligned",
167            });
168        }
169        offset += 4;
170        let max_msg_len = Header::read_u32(header, offset);
171        if max_msg_len > align(compute_max_msg_len(capacity)) {
172            return Err(InvalidMaxMessageLength {
173                msg_len: max_msg_len,
174                msg: "Max message lenght is too large",
175            });
176        }
177        if !is_aligned(max_msg_len) {
178            return Err(InvalidMaxMessageLength {
179                msg_len: max_msg_len,
180                msg: "Max message length is not 8 bytes aligned",
181            });
182        }
183        offset += 4;
184        let timeout = Header::read_u64(header, offset);
185        offset += 8;
186        let creation_time = Header::read_u64(header, offset);
187        offset += 8;
188        let tick_unit = TickUnit::from_id(header[offset]);
189        //offset += 1;
190        Ok(Header {
191            version,
192            writer_id,
193            channel_id,
194            capacity,
195            max_msg_len,
196            timeout,
197            creation_time,
198            tick_unit,
199        })
200    }
201    ///Writes kekbit metadata to a memory mapepd file.
202    ///
203    /// Returns  the lenght of the metadata
204    ///
205    /// # Arguments
206    ///
207    /// * `header` - Reference to a byte slice where metadata must be written.
208    ///              Usually points at the beginning of a memory mapped file used as storage for a kekbit channel.
209    ///
210    /// # Example
211    ///
212    ///```
213    /// use memmap::MmapOptions;
214    /// use std::fs::OpenOptions;
215    ///
216    /// use kekbit_core::tick::TickUnit::Nanos;
217    /// use kekbit_core::header::Header;
218    /// use kekbit_core::shm::*;
219    /// use std::fs::DirBuilder;
220    ///
221    /// const FOREVER: u64 = 99_999_999_999;
222    /// let writer_id = 1850;
223    /// let channel_id = 42;
224    /// let test_tmp_dir = tempdir::TempDir::new("keksample").unwrap();
225    /// let dir_path = test_tmp_dir.path().join(writer_id.to_string());
226    /// let mut builder = DirBuilder::new();
227    /// builder.recursive(true);
228    /// builder.create(&dir_path).or_else(|err| Err(err.to_string())).unwrap();
229    ///
230    /// let kek_file_name = dir_path.join(format!("{}.kekbit", channel_id));
231    /// let kek_file = OpenOptions::new()
232    /// .write(true)
233    /// .read(true)
234    /// .create(true)
235    /// .open(&kek_file_name)
236    /// .or_else(|err| Err(err.to_string())).unwrap();
237    ///
238    /// let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
239    /// let total_len = (header.capacity() + header.len() as u32) as u64;
240    /// kek_file.set_len(total_len).or_else(|err| Err(err.to_string())).unwrap();
241    /// let mut mmap = unsafe { MmapOptions::new().map_mut(&kek_file) }.unwrap();
242    /// let buf = &mut mmap[..];
243    /// header.write_to(buf);
244    /// mmap.flush().unwrap();
245    /// ```
246    #[inline]
247    pub fn write_to(&self, header: &mut [u8]) -> usize {
248        assert!(self.len() <= header.len());
249        header[0..8].clone_from_slice(&SIGNATURE.to_le_bytes());
250        let latest_v: u64 = Version::latest().into();
251        header[8..16].clone_from_slice(&latest_v.to_le_bytes());
252        header[16..24].clone_from_slice(&self.writer_id.to_le_bytes());
253        header[24..32].clone_from_slice(&self.channel_id.to_le_bytes());
254        header[32..36].clone_from_slice(&self.capacity.to_le_bytes());
255        header[36..40].clone_from_slice(&self.max_msg_len.to_le_bytes());
256        header[40..48].clone_from_slice(&self.timeout.to_le_bytes());
257        header[48..56].clone_from_slice(&self.creation_time.to_le_bytes());
258        header[56] = self.tick_unit.id();
259        let last = 57;
260        for item in header.iter_mut().take(HEADER_LEN).skip(last) {
261            *item = 0u8;
262        }
263        self.len()
264    }
265
266    #[inline]
267    fn read_u64(header: &[u8], offset: usize) -> u64 {
268        assert!(offset + 8 < HEADER_LEN);
269        u64::from_le_bytes([
270            header[offset],
271            header[offset + 1],
272            header[offset + 2],
273            header[offset + 3],
274            header[offset + 4],
275            header[offset + 5],
276            header[offset + 6],
277            header[offset + 7],
278        ])
279    }
280
281    #[inline]
282    fn read_u32(header: &[u8], offset: usize) -> u32 {
283        assert!(offset + 4 < HEADER_LEN);
284        u32::from_le_bytes([header[offset], header[offset + 1], header[offset + 2], header[offset + 3]])
285    }
286
287    ///Returns the metadata version
288    #[inline]
289    pub fn version(&self) -> String {
290        self.version.to_string()
291    }
292
293    ///Returns the channel identifier
294    #[inline]
295    pub fn channel_id(&self) -> u64 {
296        self.channel_id
297    }
298
299    ///Returns the channel writer identifier
300    #[inline]
301    pub fn writer_id(&self) -> u64 {
302        self.writer_id
303    }
304
305    ///Returns the capacity of the channel
306    #[inline]
307    pub fn capacity(&self) -> u32 {
308        self.capacity
309    }
310
311    ///Returns the maximum message size allowed
312    #[inline]
313    pub fn max_msg_len(&self) -> u32 {
314        self.max_msg_len
315    }
316    ///Returns the inactivity time interval after each the reader will consider the channel abandoned by the writer.
317    #[inline]
318    pub fn timeout(&self) -> u64 {
319        self.timeout
320    }
321
322    ///Returns the channel creation time
323    #[inline]
324    pub fn creation_time(&self) -> u64 {
325        self.creation_time
326    }
327    ///Returns the time unit used by the channel creation time and the timeout attributes.
328    #[inline]
329    pub fn tick_unit(&self) -> TickUnit {
330        self.tick_unit
331    }
332    #[inline]
333    ///Returns  the length of the metadata. For any given version the length is the same.
334    ///In the current version it is 128 bytes.
335    pub const fn len(&self) -> usize {
336        HEADER_LEN
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343    #[test]
344    fn check_read_write_header() {
345        let producer_id: u64 = 111;
346        let channel_id: u64 = 101;
347        let capacity: u32 = 10_001;
348        let max_msg_len: u32 = 100;
349        let timeout: u64 = 10_000;
350        let tick_unit = TickUnit::Nanos;
351        let head = Header::new(producer_id, channel_id, capacity, max_msg_len, timeout, tick_unit);
352        let mut data = vec![0u8; HEADER_LEN];
353        assert!(head.write_to(&mut data) == HEADER_LEN);
354        assert!(Header::read(&data).unwrap() == head);
355        assert_eq!(head.tick_unit(), TickUnit::Nanos);
356        assert_eq!(head.timeout(), timeout);
357        assert_eq!(head.version(), Version::latest().to_string());
358        assert!(head.creation_time() < tick_unit.nix_time());
359        assert_eq!(head.len(), 128);
360        assert_eq!(head.writer_id(), producer_id);
361    }
362}