shared-mem-queue 0.4.0

Single-writer single-reader queues which can be used for inter-processor-communication in a shared memory region
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
// Copyright Open Logistics Foundation
//
// Licensed under the Open Logistics Foundation License 1.3.
// For details on the licensing terms, see the LICENSE file.
// SPDX-License-Identifier: OLFL-1.3

//! FIFO queue with a packet-based interface preserving packet boundaries
//!
//! The `MsgQueue` is implemented on top of the [`ByteQueue`]. It handles variable-sized messages
//! using the following format:
//!
//! | Field            | Size                                               |
//! |------------------|----------------------------------------------------|
//! | Message Prefix   | Fixed size, configurable                           |
//! | Protocol Version | 1 byte                                             |
//! | Data Size        | 4 bytes in little endian                           |
//! | Header CRC       | 4 bytes in little endian                           |
//! | Data             | variable-sized - determined by the data size field |
//! | Message CRC      | 4 bytes in little endian                           |
//!
//! - The **Message Prefix** is a fixed-size field provided by the user to identify the beginning
//!   of a new message. If it is guaranteed that the memory will not be written by other processes,
//!   it may also be left blank (`b""`) but a short prefix will also not hurt much and provide
//!   more robustness. Theoretically, the prefix could also be used to categorize
//!   messages, i.e. to multiplex different message channels over the same memory. But currently,
//!   this is not implemented and there are currently no plans to do so and a `MsgQueue` will
//!   consider all bytes preceding the prefix as garbage and remove them.
//! - The **Protocol Version** is a 1 byte value specifying the version of the communication
//!   protocol being used for compatibility checks. It is set to a constant value for each protocol
//!   version and guarantees robustness between library versions with protocol changes.
//! - The **Data Size** specifies the length of the message data.
//! - The **Header CRC** checks the integrity of the Message Prefix, Protocol Version, and Data
//!   Size. This has been introduced to prevent getting stuck when the Data Size field contains
//!   garbage which is interpreted as huge message length so that the `MsgQueue` would wait
//!   forever. Since the library is `no_std`, this approach has been taken instead of a classic
//!   timeout.
//! - The **Data** field contains the actual message data.
//! - The **Message CRC** ensures the integrity of the entire message packet.

use crate::byte_queue::ByteQueue;
use core::convert::TryFrom;
use crc::{Crc, Digest, CRC_32_ISO_HDLC};

/// The `MsgQueue` queue type using `ByteQueue` as the underlying communication mechanism. Read the
/// crate and module documentation for further information and usage examples.
#[derive(Debug)]
pub struct MsgQueue<'a, const LEN: usize> {
    byte_queue: ByteQueue,
    prefix: &'a [u8],
    rx_buf: [u8; LEN],
    /// Determines how many bytes in the rx_buf are valid. `rx_buf_len` is the number of valid
    /// bytes, `rx_buf.len()` is the capacity of the receive buffer.
    rx_buf_len: usize,
    has_received_full_msg: bool,
}

use core::fmt;

#[derive(Debug, PartialEq)]
pub enum MqError {
    MqFull,
    MqEmpty,
    MqCrcErr,
    MqMsgTooBig,
    MqWrongProtocolVersion,
}

impl fmt::Display for MqError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            MqError::MqFull => write!(f, "Message queue is full"),
            MqError::MqEmpty => write!(f, "Message queue is empty"),
            MqError::MqCrcErr => write!(f, "CRC check failed"),
            MqError::MqMsgTooBig => write!(f, "Message is too big"),
            MqError::MqWrongProtocolVersion => {
                write!(f, "Message protocol version is incompatible")
            }
        }
    }
}

impl core::error::Error for MqError {}

const PROTOCOL_VERSION: u8 = 1;

const MSG_SIZE_FIELD_SIZE: usize = core::mem::size_of::<u32>();
const MSG_CRC_FIELD_SIZE: usize = core::mem::size_of::<u32>();
const MSG_PROTOCOL_FIELD_SIZE: usize = core::mem::size_of::<u8>();

const CRC32: Crc<u32> = Crc::<u32>::new(&CRC_32_ISO_HDLC);

impl<'a, const LEN: usize> MsgQueue<'a, LEN> {
    /// Initializes a message queue for interpreting messages with the specified prefix, using the
    /// provided `rx_buf` as storage for incoming messages. The size of `rx_buf` determines how
    /// many messages can be stored before they need to be consumed or read.
    pub fn new(byte_queue: ByteQueue, prefix: &'a [u8], rx_buf: [u8; LEN]) -> Self {
        Self {
            byte_queue,
            prefix,
            rx_buf,
            rx_buf_len: 0,
            has_received_full_msg: false,
        }
    }

    //
    // Read methods
    //

    /// Returns the length of the *p*refix
    fn len_p(&self) -> usize {
        self.prefix.len()
    }

    /// Returns the length of the *p*refix + protocol *v*ersion
    fn len_pv(&self) -> usize {
        self.len_p() + MSG_PROTOCOL_FIELD_SIZE
    }

    /// Returns the length of the *p*refix + protocol *v*ersion byte + *l*ength-field
    fn len_pvl(&self) -> usize {
        self.len_pv() + MSG_SIZE_FIELD_SIZE
    }

    /// Returns the length of the *p*refix + protocol *v*ersion byte + *l*ength-field + header *c*rc
    fn len_pvlc(&self) -> usize {
        self.len_pvl() + MSG_CRC_FIELD_SIZE
    }

    /// Returns the length of the *p*refix + protocol *v*ersion byte + *l*ength-field + header *c*rc + *d*ata
    fn len_pvlcd(&self, msg_len: usize) -> usize {
        self.len_pvlc() + msg_len
    }

    /// Returns the length of the *p*refix + protocol *v*ersion byte + *l*ength-field
    ///     + header *c*rc + *d*ata + *c*rc - the total length of the message packet
    fn len_pvlcdc(&self, msg_len: usize) -> usize {
        self.len_pvlcd(msg_len) + MSG_CRC_FIELD_SIZE
    }

    /// Reads new bytes from the byte queue into the receive buffer
    fn read_bytes(&mut self) {
        let read_bytes_len = self
            .byte_queue
            .consume_at_most(&mut self.rx_buf[self.rx_buf_len..]);
        self.rx_buf_len += read_bytes_len;
    }

    /// Removes `skip` bytes in the receive buffer to remove an old message or to remove garbage
    /// bytes. `skip` must be less than or equal to the current length of the receive buffer.
    fn skip_in_rx_buf(&mut self, skip: usize) {
        assert!(
            skip <= self.rx_buf_len,
            "skip rx_buffer value exceeds current rx_buffer length."
        );

        self.rx_buf.copy_within(skip..self.rx_buf_len, 0);
        self.rx_buf_len -= skip;
    }

    /// Removes the old message from the receive buffer, if there is one.
    fn rm_old_msg(&mut self) {
        if self.has_received_full_msg {
            let msg_len = self.try_extract_msg_len().unwrap(); // len must exist after receiving
                                                               // a complete msg
            if msg_len <= self.rx_buf_len {
                // not required, just for safety
                self.skip_in_rx_buf(self.len_pvlcdc(msg_len));
            }
            self.has_received_full_msg = false;
        }
    }

    fn invalidate_current_msg(&mut self) {
        self.skip_in_rx_buf(1);
    }

    /// Finds the prefix in the received bytes and skips to it. Returns `Ok(())` if this
    /// is successful.
    fn try_advance_to_prefix(&mut self) -> Result<(), MqError> {
        let mut pos = None;

        for (idx, window) in self.rx_buf[..self.rx_buf_len]
            .windows(self.prefix.len())
            .enumerate()
        {
            if self.prefix == window {
                pos = Some(idx);
                break;
            }
        }

        if let Some(idx) = pos {
            self.skip_in_rx_buf(idx);
            return Ok(());
        }

        /*
         * No *full* prefix found - skip all but the last prefix.len() bytes if present, because
         * they could be the beginning of the next message's prefix
         */
        if self.rx_buf_len >= self.prefix.len() {
            self.skip_in_rx_buf(self.rx_buf_len - self.prefix.len());
        }

        Err(MqError::MqEmpty)
    }

    /// Determines the length of the message, i.e. reads the length-field. This requires that
    /// enough bytes have already been read for the next message.
    fn try_extract_msg_len(&self) -> Result<usize, MqError> {
        if self.rx_buf_len < self.len_pvl() {
            return Err(MqError::MqEmpty);
        }
        let start = self.len_pv();
        let end = start + MSG_SIZE_FIELD_SIZE;
        let slice = &self.rx_buf[start..end];

        let mut array = [0u8; MSG_SIZE_FIELD_SIZE];
        array.copy_from_slice(slice);

        Ok(u32::from_le_bytes(array) as usize)
    }

    /// Checks that the message length does not exceed the receive buffer
    fn verify_msg_packet_len(&mut self, msg_len: usize) -> Result<(), MqError> {
        // Check that the message fits into our receive buffer
        if self.rx_buf.len() < self.len_pvlcdc(msg_len) {
            self.invalidate_current_msg();
            return Err(MqError::MqMsgTooBig);
        }
        Ok(())
    }

    /// Checks if the message protocol version matches with the current version for parsing
    fn verify_protocol_version(&mut self) -> Result<(), MqError> {
        if self.rx_buf_len < self.len_pv() {
            return Err(MqError::MqEmpty);
        }
        if self.rx_buf[self.len_p()] != PROTOCOL_VERSION {
            self.invalidate_current_msg();
            return Err(MqError::MqWrongProtocolVersion);
        }
        Ok(())
    }

    /// Compare the message crc with the calculated crc
    fn verify_crc(&mut self, crc_start: usize, calculated_crc: u32) -> Result<(), MqError> {
        if self.rx_buf_len < crc_start + MSG_CRC_FIELD_SIZE {
            return Err(MqError::MqEmpty); // incomplete crc bits
        }
        let crc_end = crc_start + MSG_CRC_FIELD_SIZE;
        let mut crc_array = [0u8; MSG_CRC_FIELD_SIZE];
        crc_array.copy_from_slice(&self.rx_buf[crc_start..crc_end]);
        let received_crc = u32::from_le_bytes(crc_array);

        if received_crc != calculated_crc {
            self.invalidate_current_msg();
            return Err(MqError::MqCrcErr);
        }
        Ok(())
    }

    /// Checks the completeness and validity of the message and returns its length or the
    /// appropriate error
    fn verify_full_msg(&mut self) -> Result<usize, MqError> {
        self.verify_protocol_version()?;
        let msg_len = self.try_extract_msg_len()?;
        self.verify_crc(
            self.len_pvl(),
            CRC32.checksum(&self.rx_buf[..self.len_pvl()]),
        )?;
        // The message length is verified after the header CRC has been checked. This is important
        // because the message length may be faulty and we can only rely on it after the CRC check
        // has been passed.
        self.verify_msg_packet_len(msg_len)?;
        // verify_crc waits for enough data to be received so this implicitly checks that the
        // message is complete
        self.verify_crc(
            self.len_pvlcd(msg_len),
            CRC32.checksum(&self.rx_buf[..self.len_pvlcd(msg_len)]),
        )?;

        Ok(msg_len)
    }

    /// Reads new input data and finds the position of the next message, if any, after
    /// possibly removing the previous message
    ///
    /// Returns start index and length of the next message which will be valid in the `rx_buf`
    /// until this method is called again. The `(index, length)` tuple needs to be returned instead
    /// of the message data slice (`&[u8]`) so that this method can be called in a loop in
    /// `blocking_read_msg` without borrow checker complaints.
    fn find_next_msg(&mut self) -> Result<(usize, usize), MqError> {
        self.rm_old_msg();
        self.read_bytes();
        self.try_advance_to_prefix()?;
        let msg_len = self.verify_full_msg()?;
        self.has_received_full_msg = true;
        Ok((self.len_pvlc(), self.len_pvlc() + msg_len))
    }

    /// Attempts to read a message from the queue in non-blocking mode.
    ///
    /// Returns a slice of the message data or an error if no complete message is available.
    pub fn read_or_fail(&mut self) -> Result<&[u8], MqError> {
        let (start, end) = self.find_next_msg()?;
        Ok(&self.rx_buf[start..end])
    }

    /// Attempts to read a message, blocking until a message is successfully read or an error
    /// occurs.
    pub fn read_blocking(&mut self) -> Result<&[u8], MqError> {
        loop {
            match self.find_next_msg() {
                Ok((start, end)) => return Ok(&self.rx_buf[start..end]),
                Err(MqError::MqFull | MqError::MqEmpty) => continue,
                Err(err) => return Err(err),
            }
        }
    }

    //
    // Write methods
    //

    /// *W*rite bytes *a*nd *c*alculate *c*rc
    fn wacc(&mut self, digest: &mut Digest<u32>, data: &[u8]) {
        self.byte_queue.write_or_fail(data).unwrap();
        digest.update(data);
    }

    /// Writes a full message, i.e. while calculating/updating the CRCs, writes the prefix, the
    /// protocol version, the message-length, the header crc, the data and the message CRC
    /// afterwards
    ///
    /// Calling this method requires that the underlying `ByteQueue` has enough space because all
    /// `ByteQueue` write attempts will be `unwrap`ped.
    fn write_msg(&mut self, msg_data: &[u8]) -> Result<(), MqError> {
        let mut header_crc = CRC32.digest();
        self.wacc(&mut header_crc, self.prefix);
        self.wacc(&mut header_crc, &PROTOCOL_VERSION.to_le_bytes());
        let msg_len_u32 = u32::try_from(msg_data.len()).map_err(|_| MqError::MqMsgTooBig)?;
        self.wacc(&mut header_crc, &msg_len_u32.to_le_bytes());
        // `header_crc.finalize()` takes ownership, thus clone it before to continue calculating
        // the whole crc
        let mut total_crc = header_crc.clone();
        let header_crc_bytes = header_crc.finalize().to_le_bytes();
        self.wacc(&mut total_crc, &header_crc_bytes);
        self.wacc(&mut total_crc, msg_data);
        let total_crc_bytes = total_crc.finalize().to_le_bytes();
        self.byte_queue.write_or_fail(&total_crc_bytes).unwrap();

        Ok(())
    }

    /// Attempts to write a message to the queue in non-blocking mode.
    ///
    /// Returns an error if the message is too large to fit into the `ByteQueue` buffer (`MqMsgTooBig`),
    /// or if there is insufficient space in the `ByteQueue` (`MqFull`).
    /// Otherwise, the message is written successfully.
    pub fn write_or_fail(&mut self, msg_data: &[u8]) -> Result<(), MqError> {
        if self.byte_queue.capacity() < self.len_pvlcdc(msg_data.len()) {
            return Err(MqError::MqMsgTooBig);
        }
        if self.byte_queue.space() < self.len_pvlcdc(msg_data.len()) {
            return Err(MqError::MqFull);
        }
        self.write_msg(msg_data)?;

        Ok(())
    }

    /// Writes a message to the queue in blocking mode.
    ///
    /// Returns an error if the message is too large to fit into the `ByteQueue` buffer (`MqMsgTooBig`).
    /// If there is insufficient space in the `ByteQueue`, this function will block until space becomes
    /// available.
    /// Once space is available, the message is written successfully.
    pub fn write_blocking(&mut self, msg_data: &[u8]) -> Result<(), MqError> {
        if self.byte_queue.capacity() < self.len_pvlcdc(msg_data.len()) {
            return Err(MqError::MqMsgTooBig);
        }
        while self.byte_queue.space() < self.len_pvlcdc(msg_data.len()) {}
        self.write_msg(msg_data)?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use crate::byte_queue::ByteQueue;
    use crate::msg_queue::{
        MqError, MsgQueue, MSG_CRC_FIELD_SIZE, MSG_PROTOCOL_FIELD_SIZE, MSG_SIZE_FIELD_SIZE,
    };

    const DEFAULT_PREFIX: &'static [u8] = b"DEFAULT_PREFIX: "; // 16 byte long

    #[test]
    fn test_skip_in_rx_buf() {
        let mut bq_buf = [0u32; 64];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };

        let s = b"abcde";
        for skip in 0..=s.len() {
            msg_queue.rx_buf[..s.len()].copy_from_slice(s); // dest and src len must be equal
            msg_queue.rx_buf_len = s.len();

            msg_queue.skip_in_rx_buf(skip);
            assert_eq!(&msg_queue.rx_buf[..msg_queue.rx_buf_len], &s[skip..]);
            assert_eq!(msg_queue.rx_buf_len, s.len() - skip);
        }
    }

    #[test]
    fn test_invalid_msg_size() {
        let mut bq_buf = [0u32; 10]; // byte queue capacity: 10 * 4 - 8(ptrs) - 1 = 31
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 7], // not used for writing msgs into the byte queue
            )
        };

        let data = b"abcd";
        let msg_size = DEFAULT_PREFIX.len() // 16
            + MSG_PROTOCOL_FIELD_SIZE // 1
            + MSG_SIZE_FIELD_SIZE // 4
            + MSG_CRC_FIELD_SIZE // 4
            + data.len() // 4
            + MSG_CRC_FIELD_SIZE; // 4 => 16 + 1 + 4 * 4 = 33
        assert!(msg_queue.byte_queue.capacity() < msg_size);
        assert_eq!(msg_queue.write_or_fail(data), Err(MqError::MqMsgTooBig));

        let data = b"ab";
        let msg_size = DEFAULT_PREFIX.len() // 16
            + MSG_PROTOCOL_FIELD_SIZE // 1
            + MSG_SIZE_FIELD_SIZE // 4
            + MSG_CRC_FIELD_SIZE // 4
            + data.len() // 2
            + MSG_CRC_FIELD_SIZE; // 4 => 16 + 1 + 4 * 3 + 2 = 31
        assert!(msg_queue.byte_queue.capacity() == msg_size); // 10 * 4 - 8 - 1 = 31
        assert_eq!(msg_queue.write_or_fail(&[1, 2]), Ok(()));
    }

    #[test]
    fn test_read_empty_queue() {
        let mut bq_buf = [0u32; 64];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };

        let result = msg_queue.read_or_fail();
        assert_eq!(result, Err(MqError::MqEmpty));
    }

    #[test]
    fn test_write_and_read_msg() {
        let mut bq_buf = [0u32; 64];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };

        let msg = b"Hello, World!";
        let result = msg_queue.write_or_fail(msg);
        assert!(result.is_ok());

        let read_msg = msg_queue.read_or_fail().unwrap();
        assert_eq!(read_msg, msg);
    }

    #[test]
    fn test_crc_error() {
        let mut bq_buf = [0u32; 64];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };
        let msg = b"xxxxyyyy";

        /* test the header CRC error */
        msg_queue.write_or_fail(msg).unwrap();
        // Invalidate the CRC: `bq_buf` is defined as `u32` array and in order get to the real
        // index of the crc, the index needs to be downscaled by 4 and skip 2 indices for the
        // length of the two internal pointers
        bq_buf[2 + msg_queue.len_pvl() / 4..].fill(0);
        assert_eq!(msg_queue.read_or_fail(), Err(MqError::MqCrcErr));
        // write and read should work as normal
        msg_queue.write_or_fail(msg).unwrap();
        assert_eq!(msg_queue.read_or_fail().unwrap(), msg);

        /* reinitialize the msg queue and test the second CRC error */
        msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };
        msg_queue.write_or_fail(msg).unwrap();
        bq_buf[2 + msg_queue.len_pvlcd(msg.len()) / 4..].fill(0);
        assert_eq!(msg_queue.read_or_fail(), Err(MqError::MqCrcErr));
        msg_queue.write_blocking(msg).unwrap();
        assert_eq!(msg_queue.read_or_fail().unwrap(), msg);
    }

    #[test]
    fn test_saturate_queue() {
        let mut bq_buf = [0u32; 64];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };

        /*
         * 4 byte long so that byte queue can be saturated after 7 writes:
         * (256 - 8(size of two 32-bit pointers) - 1(last byte)) / (16(prefix) + 4(length)
         *      + 1(procotol_version) + 4(crc) + 4(data) + 4(crc))
         */
        let data = b"abcd";
        let msg_size = DEFAULT_PREFIX.len()
            + MSG_PROTOCOL_FIELD_SIZE
            + MSG_SIZE_FIELD_SIZE
            + MSG_CRC_FIELD_SIZE
            + data.len()
            + MSG_CRC_FIELD_SIZE;
        let repeat = (bq_buf.len() * 4 - 2 * core::mem::size_of::<u32>() - 1) / msg_size;
        assert_eq!(repeat, 7);

        for _ in 0..repeat {
            let result = msg_queue.write_or_fail(data);
            assert_eq!(result, Ok(()));
        }
        assert_eq!(
            msg_queue.byte_queue.space(),
            (bq_buf.len() * 4 - 2 * core::mem::size_of::<u32>() - 1 - repeat * msg_size)
        );

        let result = msg_queue.write_or_fail(data);
        assert_eq!(result, Err(MqError::MqFull));
    }

    #[test]
    fn test_read_after_invalid_msg() {
        let mut bq_buf = [0u32; 64];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };

        let msg = b"valid msg";
        msg_queue.write_or_fail(msg).unwrap();

        msg_queue.read_bytes();
        msg_queue.invalidate_current_msg();

        let result = msg_queue.read_or_fail();
        assert_eq!(result, Err(MqError::MqEmpty));
    }

    #[test]
    fn test_read_write_after_invalid_msg() {
        let mut bq_buf = [0u32; 64];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };

        let msg = b"valid msg";
        msg_queue.write_or_fail(msg).unwrap();
        msg_queue.write_or_fail(msg).unwrap();

        msg_queue.read_bytes();
        msg_queue.invalidate_current_msg();

        let result = msg_queue.read_or_fail().unwrap();
        assert_eq!(result, msg);
    }

    #[test]
    fn test_blocking_read_msg() {
        let mut bq_buf = [0u32; 64];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };
        let msg = b"Blocking Msg";

        msg_queue.write_blocking(msg).unwrap();

        let read_msg = msg_queue.read_blocking().unwrap();
        assert_eq!(read_msg, msg);
    }

    #[test]
    fn test_read_part_of_next_msg() {
        let mut bq_buf = [0u32; 128];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64], // rx_buf is intentionally too small to hold the full test messages
            )
        };

        let msg = b"valid msg";
        let garbage = [0xff; 128];

        for garbage_len in 64 - 20..64 {
            // write some garbage and a real message into the queue
            msg_queue
                .byte_queue
                .write_or_fail(&garbage[..garbage_len])
                .unwrap();
            msg_queue.write_or_fail(msg).unwrap();
            // Since the msg rx buf is too small, the first read attempt will only read the garbage
            // and not find a message
            assert_eq!(msg_queue.read_or_fail(), Err(MqError::MqEmpty));
            // The second read attempt will succeed
            assert_eq!(msg_queue.read_or_fail().unwrap(), msg);
        }
    }

    #[test]
    fn test_incompatible_protocol_version() {
        let mut bq_buf = [0u32; 128];
        let mut msg_queue = unsafe {
            MsgQueue::new(
                ByteQueue::create(bq_buf.as_mut_ptr() as *mut u8, bq_buf.len() * 4),
                DEFAULT_PREFIX,
                [0u8; 64 * 4],
            )
        };

        let msg = b"xxxxyyyy";
        msg_queue.write_blocking(msg).unwrap();
        let u8_slice: &mut [u8] =
            unsafe { std::slice::from_raw_parts_mut(bq_buf.as_mut_ptr() as *mut u8, 128 * 4) };
        u8_slice[8 + msg_queue.len_p()] = 2;
        assert_eq!(
            msg_queue.read_or_fail(),
            Err(MqError::MqWrongProtocolVersion)
        );
        // should continue to be able to read the next msg
        msg_queue.write_blocking(msg).unwrap();
        assert_eq!(msg_queue.read_or_fail().unwrap(), msg);
    }
}