kekbit_core/header.rs
1//!Handles metadata associated with a channel.
2use crate::api::ChannelError;
3use crate::api::ChannelError::{IncompatibleVersion, InvalidCapacity, InvalidMaxMessageLength, InvalidSignature};
4use crate::tick::TickUnit;
5use crate::utils::{align, is_aligned, REC_HEADER_LEN};
6use crate::version::Version;
7use std::cmp::max;
8use std::cmp::min;
9
10const MIN_CAPACITY: u32 = 1024 * 16;
11const HEADER_LEN: usize = 128;
12const SIGNATURE: u64 = 0x2A54_4942_4B45_4B2A; //"*KEKBIT*" as bytes as u64
13
14#[inline]
15const fn compute_max_msg_len(capacity: u32) -> u32 {
16 //if you reduce MIN_CAPACITY this may underflow!
17 (capacity >> 7) - (REC_HEADER_LEN as u32)
18}
19
20/// Defines and validates the metadata associated with a channel.
21#[derive(PartialEq, Eq, Debug)]
22pub struct Header {
23 writer_id: u64,
24 channel_id: u64,
25 capacity: u32,
26 max_msg_len: u32,
27 timeout: u64,
28 creation_time: u64,
29 tick_unit: TickUnit,
30 version: Version,
31}
32
33#[allow(clippy::len_without_is_empty)]
34impl Header {
35 /// Defines a new channel header.
36 ///
37 /// Return a struct that contains all the metadata required to be associated with a new channel.
38 ///
39 /// # Arguments
40 ///
41 /// * `writer_id` - Channel's writer identifier
42 /// * `channel_id` - Channel's identifier
43 /// * `capacity_hint` - Hint for the size of the channel - the maximum amount of data that can be wrote into the channel.
44 /// Usually a successfully created channel will have a size very close to this hint, probably a little larger.
45 /// * `max_msg_len_hint` - Hint for the maximum size of a message wrote into the channel. This cannot be larger than a certain fraction.
46 /// of the channel's capacity(1/128th), so the new created channel may have max message length value smaller than this hint.
47 /// * `timeout` - Specifies the write inactivity time interval after each the reader will consider the channel abandoned by the writer.
48 /// * `tick_unit` - Time unit used by the timeout and creation time attributes.
49 ///
50 /// # Example
51 ///
52 /// ```
53 /// use kekbit_core::tick::TickUnit::Nanos;
54 /// use kekbit_core::header::*;
55 ///
56 /// let producer_id: u64 = 111;
57 /// let channel_id: u64 = 101;
58 /// let capacity: u32 = 10_001;
59 /// let max_msg_len: u32 = 100;
60 /// let timeout: u64 = 10_000;
61 /// let tick_unit = Nanos;
62 /// let header = Header::new(channel_id, producer_id, capacity, max_msg_len, timeout, tick_unit);
63 /// println!("{:?}", &header);
64 /// ````
65 ///
66 ///
67 #[inline]
68 pub fn new(
69 writer_id: u64,
70 channel_id: u64,
71 capacity_hint: u32,
72 max_msg_len_hint: u32,
73 timeout: u64,
74 tick_unit: TickUnit,
75 ) -> Header {
76 let capacity = max(MIN_CAPACITY, align(capacity_hint));
77 let max_msg_len = align(min(max_msg_len_hint + REC_HEADER_LEN, compute_max_msg_len(capacity)) as u32);
78 let creation_time = tick_unit.nix_time();
79 Header {
80 writer_id,
81 channel_id,
82 capacity,
83 max_msg_len,
84 timeout,
85 creation_time,
86 tick_unit,
87 version: Version::latest(),
88 }
89 }
90 ///Reads and `validates` the metadata from an existing memory mapped channel.
91 ///
92 ///Returns the metadata associated with the channel.
93 ///
94 /// # Arguments
95 ///
96 /// * `header` - Reference to a byte array which should contain metadata associated with a given channel.
97 /// Usually points at the beginning of a memory mapped file used as storage for a kekbit channel.
98 ///
99 /// # Errors
100 ///
101 /// An error will occur if data is corrupted or points to an incompatible version of kekbit channel.
102 ///
103 /// # Example
104 ///
105 ///```
106 /// use memmap::MmapOptions;
107 /// use std::fs::OpenOptions;
108 ///
109 /// # use kekbit_core::tick::TickUnit::Nanos;
110 /// # use kekbit_core::header::Header;
111 /// # use kekbit_codecs::codecs::raw::RawBinDataFormat;
112 /// use kekbit_core::shm::*;
113 /// # const FOREVER: u64 = 99_999_999_999;
114 /// let writer_id = 1850;
115 /// let channel_id = 4242;
116 /// # let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
117 /// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
118 /// let dir_path = test_tmp_dir.path();
119 /// # let writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
120 ///
121 /// let kek_file_name = storage_path(dir_path, channel_id);
122 /// let kek_file = OpenOptions::new()
123 /// .write(true)
124 /// .read(true)
125 /// .open(&kek_file_name).unwrap();
126 /// let mut mmap = unsafe { MmapOptions::new().map_mut(&kek_file) }.unwrap();
127 /// let buf = &mut mmap[..];
128 /// let header = Header::read(buf).unwrap();
129 /// println!("{:?}", &header);
130 /// ```
131 ///
132 pub fn read(header: &[u8]) -> Result<Header, ChannelError> {
133 assert!(header.len() >= HEADER_LEN);
134 let mut offset = 0;
135 let signature = Header::read_u64(header, offset);
136 if signature != SIGNATURE {
137 return Err(InvalidSignature {
138 expected: SIGNATURE,
139 actual: signature,
140 });
141 }
142 offset += 8;
143 let version: Version = Header::read_u64(header, 8).into();
144 let latest = Version::latest();
145 if !latest.is_compatible(version) {
146 return Err(IncompatibleVersion {
147 expected: latest.into(),
148 actual: version.into(),
149 });
150 }
151 offset += 8;
152 let writer_id = Header::read_u64(header, offset);
153 offset += 8;
154 let channel_id = Header::read_u64(header, offset);
155 offset += 8;
156 let capacity = Header::read_u32(header, offset);
157 if capacity < MIN_CAPACITY {
158 return Err(InvalidCapacity {
159 capacity,
160 msg: "Capacity below minimum allowed of 10KB",
161 });
162 }
163 if !is_aligned(MIN_CAPACITY) {
164 return Err(InvalidCapacity {
165 capacity,
166 msg: "Capacity is not 8 bytes aligned",
167 });
168 }
169 offset += 4;
170 let max_msg_len = Header::read_u32(header, offset);
171 if max_msg_len > align(compute_max_msg_len(capacity)) {
172 return Err(InvalidMaxMessageLength {
173 msg_len: max_msg_len,
174 msg: "Max message lenght is too large",
175 });
176 }
177 if !is_aligned(max_msg_len) {
178 return Err(InvalidMaxMessageLength {
179 msg_len: max_msg_len,
180 msg: "Max message length is not 8 bytes aligned",
181 });
182 }
183 offset += 4;
184 let timeout = Header::read_u64(header, offset);
185 offset += 8;
186 let creation_time = Header::read_u64(header, offset);
187 offset += 8;
188 let tick_unit = TickUnit::from_id(header[offset]);
189 //offset += 1;
190 Ok(Header {
191 version,
192 writer_id,
193 channel_id,
194 capacity,
195 max_msg_len,
196 timeout,
197 creation_time,
198 tick_unit,
199 })
200 }
201 ///Writes kekbit metadata to a memory mapepd file.
202 ///
203 /// Returns the lenght of the metadata
204 ///
205 /// # Arguments
206 ///
207 /// * `header` - Reference to a byte slice where metadata must be written.
208 /// Usually points at the beginning of a memory mapped file used as storage for a kekbit channel.
209 ///
210 /// # Example
211 ///
212 ///```
213 /// use memmap::MmapOptions;
214 /// use std::fs::OpenOptions;
215 ///
216 /// use kekbit_core::tick::TickUnit::Nanos;
217 /// use kekbit_core::header::Header;
218 /// use kekbit_core::shm::*;
219 /// use std::fs::DirBuilder;
220 ///
221 /// const FOREVER: u64 = 99_999_999_999;
222 /// let writer_id = 1850;
223 /// let channel_id = 42;
224 /// let test_tmp_dir = tempdir::TempDir::new("keksample").unwrap();
225 /// let dir_path = test_tmp_dir.path().join(writer_id.to_string());
226 /// let mut builder = DirBuilder::new();
227 /// builder.recursive(true);
228 /// builder.create(&dir_path).or_else(|err| Err(err.to_string())).unwrap();
229 ///
230 /// let kek_file_name = dir_path.join(format!("{}.kekbit", channel_id));
231 /// let kek_file = OpenOptions::new()
232 /// .write(true)
233 /// .read(true)
234 /// .create(true)
235 /// .open(&kek_file_name)
236 /// .or_else(|err| Err(err.to_string())).unwrap();
237 ///
238 /// let header = Header::new(writer_id, channel_id, 300_000, 1000, FOREVER, Nanos);
239 /// let total_len = (header.capacity() + header.len() as u32) as u64;
240 /// kek_file.set_len(total_len).or_else(|err| Err(err.to_string())).unwrap();
241 /// let mut mmap = unsafe { MmapOptions::new().map_mut(&kek_file) }.unwrap();
242 /// let buf = &mut mmap[..];
243 /// header.write_to(buf);
244 /// mmap.flush().unwrap();
245 /// ```
246 #[inline]
247 pub fn write_to(&self, header: &mut [u8]) -> usize {
248 assert!(self.len() <= header.len());
249 header[0..8].clone_from_slice(&SIGNATURE.to_le_bytes());
250 let latest_v: u64 = Version::latest().into();
251 header[8..16].clone_from_slice(&latest_v.to_le_bytes());
252 header[16..24].clone_from_slice(&self.writer_id.to_le_bytes());
253 header[24..32].clone_from_slice(&self.channel_id.to_le_bytes());
254 header[32..36].clone_from_slice(&self.capacity.to_le_bytes());
255 header[36..40].clone_from_slice(&self.max_msg_len.to_le_bytes());
256 header[40..48].clone_from_slice(&self.timeout.to_le_bytes());
257 header[48..56].clone_from_slice(&self.creation_time.to_le_bytes());
258 header[56] = self.tick_unit.id();
259 let last = 57;
260 for item in header.iter_mut().take(HEADER_LEN).skip(last) {
261 *item = 0u8;
262 }
263 self.len()
264 }
265
266 #[inline]
267 fn read_u64(header: &[u8], offset: usize) -> u64 {
268 assert!(offset + 8 < HEADER_LEN);
269 u64::from_le_bytes([
270 header[offset],
271 header[offset + 1],
272 header[offset + 2],
273 header[offset + 3],
274 header[offset + 4],
275 header[offset + 5],
276 header[offset + 6],
277 header[offset + 7],
278 ])
279 }
280
281 #[inline]
282 fn read_u32(header: &[u8], offset: usize) -> u32 {
283 assert!(offset + 4 < HEADER_LEN);
284 u32::from_le_bytes([header[offset], header[offset + 1], header[offset + 2], header[offset + 3]])
285 }
286
287 ///Returns the metadata version
288 #[inline]
289 pub fn version(&self) -> String {
290 self.version.to_string()
291 }
292
293 ///Returns the channel identifier
294 #[inline]
295 pub fn channel_id(&self) -> u64 {
296 self.channel_id
297 }
298
299 ///Returns the channel writer identifier
300 #[inline]
301 pub fn writer_id(&self) -> u64 {
302 self.writer_id
303 }
304
305 ///Returns the capacity of the channel
306 #[inline]
307 pub fn capacity(&self) -> u32 {
308 self.capacity
309 }
310
311 ///Returns the maximum message size allowed
312 #[inline]
313 pub fn max_msg_len(&self) -> u32 {
314 self.max_msg_len
315 }
316 ///Returns the inactivity time interval after each the reader will consider the channel abandoned by the writer.
317 #[inline]
318 pub fn timeout(&self) -> u64 {
319 self.timeout
320 }
321
322 ///Returns the channel creation time
323 #[inline]
324 pub fn creation_time(&self) -> u64 {
325 self.creation_time
326 }
327 ///Returns the time unit used by the channel creation time and the timeout attributes.
328 #[inline]
329 pub fn tick_unit(&self) -> TickUnit {
330 self.tick_unit
331 }
332 #[inline]
333 ///Returns the length of the metadata. For any given version the length is the same.
334 ///In the current version it is 128 bytes.
335 pub const fn len(&self) -> usize {
336 HEADER_LEN
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343 #[test]
344 fn check_read_write_header() {
345 let producer_id: u64 = 111;
346 let channel_id: u64 = 101;
347 let capacity: u32 = 10_001;
348 let max_msg_len: u32 = 100;
349 let timeout: u64 = 10_000;
350 let tick_unit = TickUnit::Nanos;
351 let head = Header::new(producer_id, channel_id, capacity, max_msg_len, timeout, tick_unit);
352 let mut data = vec![0u8; HEADER_LEN];
353 assert!(head.write_to(&mut data) == HEADER_LEN);
354 assert!(Header::read(&data).unwrap() == head);
355 assert_eq!(head.tick_unit(), TickUnit::Nanos);
356 assert_eq!(head.timeout(), timeout);
357 assert_eq!(head.version(), Version::latest().to_string());
358 assert!(head.creation_time() < tick_unit.nix_time());
359 assert_eq!(head.len(), 128);
360 assert_eq!(head.writer_id(), producer_id);
361 }
362}