shared_mem_queue/
byte_queue.rs

1// Copyright Open Logistics Foundation
2//
3// Licensed under the Open Logistics Foundation License 1.3.
4// For details on the licensing terms, see the LICENSE file.
5// SPDX-License-Identifier: OLFL-1.3
6
7//! FIFO queue with a byte-oriented interface
8//!
9//! The `ByteQueue` operates on a shared memory region and keeps track of a write-pointer and a
10//! read-pointer. To access both pointers from both processors, the pointers are stored in the
11//! shared memory region itself so the capacity of the queue is `2*size_of::<u32>()` smaller than
12//! the memory region size.
13//!
14//! When initializing the `ByteQueue` from an array buffer, the buffer should be defined with the
15//! type `u32` to ensure alignment regardless of its size, and then cast into an `u8` pointer when
16//! passed into the `ByteQueue` constructor. Additionally, the size parameter for the constructor
17//! needs to be adjusted accordingly, i.e. it is 4 times larger than the array length.
18//!
19//! The main contract for the `ByteQueue` is that only the writer may write to the
20//! write-pointer, only the reader may change the read-pointer. The memory
21//! region in front of the write-pointer and up to the read-pointer is owned by
22//! the writer (it may be changed by the writer), the memory region in front of the read-pointer
23//! and up to the write-pointer is owned by the reader (it may not be changed by the writer and can
24//! safely be read by the reader). For initialization, both pointers have
25//! to be set to 0 at the beginning. This is in contrast to the contract above because the
26//! initializing processor needs to write both pointers. Therefore, this has to be done by processor
27//! A while it is guaranteed that processor B does not access the queue yet to prevent race
28//! conditions.
29//!
30//! Because processor A has to initialize the byte queue and processor B should not
31//! reset the write- and read-pointers, there are two methods for
32//! initialization: [`ByteQueue::create`] should be called by the first processor and
33//! sets both pointers to 0, [`ByteQueue::attach`] should be called by the second one.
34//!
35//! The `ByteQueue` implements both the write- and the read-methods but
36//! each processor should have either the writing side or the reading side
37//! assigned to it and must not call the other methods. It would also be
38//! possible to have a `SharedMemWriter` and a `SharedMemReader` but this
39//! design was initially chosen so that the queue can also be used as a simple
40//! ring buffer on a single processor.
41
42use core::convert::{Infallible, TryFrom, TryInto};
43use core::ptr::read_volatile;
44use core::ptr::write_volatile;
45use core::sync::atomic;
46use core::sync::atomic::Ordering;
47
48/// The `ByteQueue` queue type. Read the crate and module documentation for further information and
49/// usage examples.
50#[derive(Debug)]
51pub struct ByteQueue {
52    write_pos_ptr: *mut u32,
53    read_pos_ptr: *mut u32,
54    data_ptr: *mut u8,
55    capacity: usize,
56}
57
58/// The `ByteQueue` is not automatically `Send` because it contains raw pointers. According to
59/// [the Nomicon](https://doc.rust-lang.org/1.81.0/nomicon/send-and-sync.html), raw pointers
60/// could be considered "fine [...] to be marked as thread safe" but their "non-trivial untracked
61/// ownership" requires to decide manually if a type containing raw pointers is `Send`.
62///
63/// Regarding the `ByteQueue`, every instantiation and usage is so unsafe that the user needs to be
64/// careful anyway. If all usage requirements are still met, the `ByteQueue` can safely be used
65/// from another thread, too. Therefore, `Send` is implemented manually here to increase the
66/// flexibility for the users.
67///
68/// An other perspective on implementing `Send` is that the `ByteQueue` is fundamentally designed
69/// to be used for inter-processor communication which is in many ways equivalent to inter-thread
70/// operation. Thus, implementing `Send` does not introduce any new requirements.
71unsafe impl Send for ByteQueue {}
72
73impl ByteQueue {
74    /// Creates a new queue in the given memory region and initializes both pointers.
75    ///
76    /// # Safety
77    /// This method has to be called before the other processor tries to access the queue because
78    /// the other processor might access an uninitialized memory region otherwise which will most
79    /// likely result in crashes.
80    ///
81    /// Obviously, the memory pointer and the memory region length must be correct, reserved for
82    /// this purpose and known to the other processor.
83    pub unsafe fn create(mem: *mut u8, mem_len: usize) -> Self {
84        let mut slf = Self::attach(mem, mem_len);
85        slf.set_write_pos(0);
86        slf.set_read_pos(0);
87        slf
88    }
89    /// Attaches to a queue which has previously been initialized by [`ByteQueue::create`],
90    /// possibly by an other processor.
91    ///
92    /// # Safety
93    /// This method must not be called before the other processor has properly initialized the
94    /// queue because this will most likely result in crashes.
95    ///
96    /// Obviously, the memory pointer rand the memory region length must be correct, reserved for
97    /// this purpose and known to the other processor.
98    pub unsafe fn attach(mem: *mut u8, mem_len: usize) -> Self {
99        ByteQueue {
100            write_pos_ptr: mem as *mut u32,
101            read_pos_ptr: (mem as *mut u32).offset(1),
102            data_ptr: mem.offset(
103                isize::try_from(2 * core::mem::size_of::<u32>())
104                    .expect("~8u should be convertible to isize"),
105            ),
106            capacity: mem_len - 2 * core::mem::size_of::<u32>(),
107        }
108    }
109    fn get_write_pos(&self) -> usize {
110        unsafe { read_volatile(self.write_pos_ptr) as usize }
111    }
112    fn get_read_pos(&self) -> usize {
113        unsafe { read_volatile(self.read_pos_ptr) as usize }
114    }
115    fn set_write_pos(&mut self, wpos: usize) {
116        unsafe {
117            write_volatile(
118                self.write_pos_ptr,
119                wpos.try_into().expect("cannot convert usize into u32"),
120            )
121        }
122    }
123    fn set_read_pos(&mut self, rpos: usize) {
124        unsafe {
125            write_volatile(
126                self.read_pos_ptr,
127                rpos.try_into().expect("cannot convert usize into u32"),
128            )
129        }
130    }
131
132    /// Returns the size of the available space, which can be written into the queue.
133    pub fn space(&self) -> usize {
134        (self.capacity + self.get_read_pos() - self.get_write_pos() - 1) % self.capacity
135    }
136
137    pub fn capacity(&self) -> usize {
138        self.capacity - 1
139    }
140
141    /// Returns the size of the written messages, which are to be consumed or read.
142    pub fn size(&self) -> usize {
143        (self.capacity + self.get_write_pos() - self.get_read_pos()) % self.capacity
144    }
145
146    //
147    // Write methods
148    //
149
150    /// Writes at most `len` bytes into the byte queue, or less depending on the given size of
151    /// the data to be written, *and* the currently available space in the byte queue.
152    ///
153    /// Memory fences are used for proper synchronization.
154    pub fn write_at_most(&mut self, data: &[u8]) -> usize {
155        let len = data.len().min(self.space());
156
157        atomic::fence(Ordering::Acquire);
158        let wpos = self.get_write_pos();
159        for (i, byte) in data.iter().enumerate().take(len) {
160            let offset = (wpos + i) % self.capacity;
161            unsafe {
162                let dptr = self.data_ptr.add(offset);
163                write_volatile(dptr, *byte);
164            }
165        }
166        atomic::fence(Ordering::Release);
167        let wpos = (wpos + len) % self.capacity;
168        self.set_write_pos(wpos);
169
170        len
171    }
172
173    /// Attempts to write data to the queue in non-blocking mode.
174    ///
175    /// If there is not enough space to write the entire data, returns an error (`WouldBlock`).
176    /// On success, writes the data into the queue.
177    pub fn write_or_fail(&mut self, data: &[u8]) -> nb::Result<(), Infallible> {
178        if self.space() < data.len() {
179            return Err(nb::Error::WouldBlock);
180        }
181        self.write_at_most(data);
182        Ok(())
183    }
184
185    /// Blocks until there is enough space in the queue to write the data.
186    ///
187    /// Once space is available, writes `data.len()` bytes of data to the queue.
188    pub fn write_blocking(&mut self, data: &[u8]) {
189        loop {
190            if self.space() >= data.len() {
191                break;
192            }
193        }
194        self.write_at_most(data);
195    }
196
197    //
198    // Skip methods
199    //
200
201    /// Skips at most `len` bytes, or less depending on the size of the written data in the byte
202    /// queue.
203    pub fn skip_at_most(&mut self, len: usize) -> usize {
204        let len = len.min(self.size());
205        self.set_read_pos((self.get_read_pos() + len) % self.capacity);
206
207        len
208    }
209
210    /// Attempts to skip `len` bytes in non-blocking mode.
211    ///
212    /// If there is not enough data to be skipped, returns an error (`WouldBlock`).
213    /// On success, skips `len` bytes of written data.
214    pub fn skip_or_fail(&mut self, len: usize) -> nb::Result<(), Infallible> {
215        if self.size() < len {
216            return Err(nb::Error::WouldBlock);
217        }
218        self.skip_at_most(len);
219        Ok(())
220    }
221
222    /// Blocks until there is enough data in the queue to be skipped.
223    ///
224    /// Once enough data is available, skips `len` bytes of data in the queue.
225    pub fn skip_blocking(&mut self, len: usize) {
226        loop {
227            if self.size() >= len {
228                break;
229            }
230        }
231        self.skip_at_most(len);
232    }
233
234    //
235    // Peek methods
236    //
237
238    /// Read at most `len` bytes without losing them in the queue, or less depending on the
239    /// size of the written data in the byte queue.
240    pub fn peek_at_most(&self, buf: &mut [u8], len: usize) -> usize {
241        let len = len.min(buf.len()).min(self.size());
242
243        atomic::fence(Ordering::Acquire);
244        let rpos = self.get_read_pos();
245        for (i, byte) in buf.iter_mut().enumerate().take(len) {
246            let offset = (rpos + i) % self.capacity;
247            unsafe {
248                let dptr = self.data_ptr.add(offset);
249                *byte = read_volatile(dptr);
250            }
251        }
252        atomic::fence(Ordering::Release);
253
254        // Here is where we would update the read position pointer in a consuming implementation.
255        // Since consume = peek + skip, the skip function does not need memory fencing because it
256        // is done here already.
257        len
258    }
259
260    /// Attempts to fill the buffer completely with the data in the byte queue in non-blocking
261    /// mode.
262    ///
263    /// If there is not enough data, returns an error (`WouldBlock`).
264    /// On success, read `buf.len()` bytes of written data without skipping them in the byte
265    /// queue.
266    pub fn peek_or_fail(&self, buf: &mut [u8]) -> nb::Result<(), Infallible> {
267        if self.size() < buf.len() {
268            return Err(nb::Error::WouldBlock);
269        }
270        self.peek_at_most(buf, buf.len());
271        Ok(())
272    }
273
274    /// Blocks until there is enough data in the queue to fill the buffer completely.
275    ///
276    /// On success, read `buf.len()` bytes of written data without skipping them in the byte
277    /// queue.
278    pub fn peek_blocking(&self, buf: &mut [u8]) {
279        loop {
280            if self.size() >= buf.len() {
281                break;
282            }
283        }
284        self.peek_at_most(buf, buf.len());
285    }
286
287    //
288    // Consume methods
289    //
290
291    /// Reads up to the available data into the provided buffer, returning the number of bytes read.
292    ///
293    /// This method reads/consumes at most the size of the buffer or the amount of available data,
294    /// whichever is smaller.
295    pub fn consume_at_most(&mut self, buf: &mut [u8]) -> usize {
296        let len = self.peek_at_most(buf, buf.len());
297
298        self.skip_at_most(len)
299    }
300
301    /// Attempts to read data from the queue in non-blocking mode. If there is not enough data in
302    /// `buf.len()` size available to be read, returns an error (`WouldBlock`).
303    ///
304    /// On success, reads/consumes the data in `buf.len()` size from the queue into
305    /// the provided buffer.
306    pub fn consume_or_fail(&mut self, buf: &mut [u8]) -> nb::Result<(), Infallible> {
307        self.peek_or_fail(buf)?;
308        self.skip_at_most(buf.len());
309
310        Ok(())
311    }
312
313    /// Blocks until there is enough data in the queue to fill the buffer completely.
314    ///
315    /// On success, reads/consumes the data in `buf.len()` size from the queue into
316    /// the provided buffer.
317    pub fn consume_blocking(&mut self, buf: &mut [u8]) {
318        self.peek_blocking(buf);
319        self.skip_at_most(buf.len());
320    }
321}
322
323impl core::fmt::Write for ByteQueue {
324    fn write_str(&mut self, s: &str) -> Result<(), core::fmt::Error> {
325        self.write_blocking(s.as_bytes());
326        Ok(())
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::ByteQueue;
333    const LEN_U32_TO_U8_SCALER: usize = core::mem::size_of::<u32>();
334
335    #[test]
336    fn test_peek() {
337        let mut buffer = [123u32; 17];
338        let mut writer = unsafe {
339            ByteQueue::create(
340                buffer.as_mut_ptr() as *mut u8,
341                buffer.len() * LEN_U32_TO_U8_SCALER,
342            )
343        };
344        let mut reader = unsafe {
345            ByteQueue::attach(
346                buffer.as_mut_ptr() as *mut u8,
347                buffer.len() * LEN_U32_TO_U8_SCALER,
348            )
349        };
350        let tx = [1, 2, 3, 4];
351        writer.write_or_fail(&tx).unwrap();
352
353        let mut rx = [0u8; 4];
354        reader.peek_or_fail(&mut rx).unwrap();
355        assert_eq!(&tx, &rx);
356        assert!(reader.size() == tx.len());
357        for i in 0..1234 {
358            reader.peek_at_most(&mut rx, i);
359            assert_eq!(&tx[..i.min(tx.len())], &rx[..i.min(rx.len())]);
360            assert!(reader.size() == tx.len());
361        }
362
363        reader.consume_or_fail(&mut rx).unwrap();
364        assert_eq!(&tx, &rx);
365        assert!(reader.size() == 0);
366    }
367
368    #[test]
369    fn test_skip() {
370        let mut buffer = [123u32; 55];
371        let mut writer = unsafe {
372            ByteQueue::create(
373                buffer.as_mut_ptr() as *mut u8,
374                buffer.len() * LEN_U32_TO_U8_SCALER,
375            )
376        };
377        let mut reader = unsafe {
378            ByteQueue::attach(
379                buffer.as_mut_ptr() as *mut u8,
380                buffer.len() * LEN_U32_TO_U8_SCALER,
381            )
382        };
383
384        let data = [0xffu8; 10];
385        let sum_to_ten = 55;
386        for i in 0..=10 {
387            writer.write_at_most(&data[..i]);
388        }
389
390        let mut skipped = 0;
391        for i in 0..=10 {
392            reader.skip_or_fail(i).unwrap();
393            skipped += i;
394            assert_eq!(reader.size(), sum_to_ten - skipped);
395        }
396    }
397
398    #[test]
399    fn write_read() {
400        let mut buffer = [123u32; 17];
401        let mut writer = unsafe {
402            ByteQueue::create(
403                buffer.as_mut_ptr() as *mut u8,
404                buffer.len() * LEN_U32_TO_U8_SCALER,
405            )
406        };
407        let mut reader = unsafe {
408            ByteQueue::attach(
409                buffer.as_mut_ptr() as *mut u8,
410                buffer.len() * LEN_U32_TO_U8_SCALER,
411            )
412        };
413        let tx = [1, 2, 3, 4];
414        writer.write_blocking(&tx);
415        let mut rx = [0u8; 4];
416        reader.consume_blocking(&mut rx);
417        assert_eq!(&tx, &rx);
418    }
419}