shared_mem_queue/byte_queue.rs
1// Copyright Open Logistics Foundation
2//
3// Licensed under the Open Logistics Foundation License 1.3.
4// For details on the licensing terms, see the LICENSE file.
5// SPDX-License-Identifier: OLFL-1.3
6
7//! FIFO queue with a byte-oriented interface
8//!
9//! The `ByteQueue` operates on a shared memory region and keeps track of a write-pointer and a
10//! read-pointer. To access both pointers from both processors, the pointers are stored in the
11//! shared memory region itself so the capacity of the queue is `2*size_of::<u32>()` smaller than
12//! the memory region size.
13//!
14//! When initializing the `ByteQueue` from an array buffer, the buffer should be defined with the
15//! type `u32` to ensure alignment regardless of its size, and then cast into an `u8` pointer when
16//! passed into the `ByteQueue` constructor. Additionally, the size parameter for the constructor
17//! needs to be adjusted accordingly, i.e. it is 4 times larger than the array length.
18//!
19//! The main contract for the `ByteQueue` is that only the writer may write to the
20//! write-pointer, only the reader may change the read-pointer. The memory
21//! region in front of the write-pointer and up to the read-pointer is owned by
22//! the writer (it may be changed by the writer), the memory region in front of the read-pointer
23//! and up to the write-pointer is owned by the reader (it may not be changed by the writer and can
24//! safely be read by the reader). For initialization, both pointers have
25//! to be set to 0 at the beginning. This is in contrast to the contract above because the
26//! initializing processor needs to write both pointers. Therefore, this has to be done by processor
27//! A while it is guaranteed that processor B does not access the queue yet to prevent race
28//! conditions.
29//!
30//! Because processor A has to initialize the byte queue and processor B should not
31//! reset the write- and read-pointers, there are two methods for
32//! initialization: [`ByteQueue::create`] should be called by the first processor and
33//! sets both pointers to 0, [`ByteQueue::attach`] should be called by the second one.
34//!
35//! The `ByteQueue` implements both the write- and the read-methods but
36//! each processor should have either the writing side or the reading side
37//! assigned to it and must not call the other methods. It would also be
38//! possible to have a `SharedMemWriter` and a `SharedMemReader` but this
39//! design was initially chosen so that the queue can also be used as a simple
40//! ring buffer on a single processor.
41
42use core::convert::{Infallible, TryFrom, TryInto};
43use core::ptr::read_volatile;
44use core::ptr::write_volatile;
45use core::sync::atomic;
46use core::sync::atomic::Ordering;
47
48/// The `ByteQueue` queue type. Read the crate and module documentation for further information and
49/// usage examples.
50#[derive(Debug)]
51pub struct ByteQueue {
52 write_pos_ptr: *mut u32,
53 read_pos_ptr: *mut u32,
54 data_ptr: *mut u8,
55 capacity: usize,
56}
57
58/// The `ByteQueue` is not automatically `Send` because it contains raw pointers. According to
59/// [the Nomicon](https://doc.rust-lang.org/1.81.0/nomicon/send-and-sync.html), raw pointers
60/// could be considered "fine [...] to be marked as thread safe" but their "non-trivial untracked
61/// ownership" requires to decide manually if a type containing raw pointers is `Send`.
62///
63/// Regarding the `ByteQueue`, every instantiation and usage is so unsafe that the user needs to be
64/// careful anyway. If all usage requirements are still met, the `ByteQueue` can safely be used
65/// from another thread, too. Therefore, `Send` is implemented manually here to increase the
66/// flexibility for the users.
67///
68/// An other perspective on implementing `Send` is that the `ByteQueue` is fundamentally designed
69/// to be used for inter-processor communication which is in many ways equivalent to inter-thread
70/// operation. Thus, implementing `Send` does not introduce any new requirements.
71unsafe impl Send for ByteQueue {}
72
73impl ByteQueue {
74 /// Creates a new queue in the given memory region and initializes both pointers.
75 ///
76 /// # Safety
77 /// This method has to be called before the other processor tries to access the queue because
78 /// the other processor might access an uninitialized memory region otherwise which will most
79 /// likely result in crashes.
80 ///
81 /// Obviously, the memory pointer and the memory region length must be correct, reserved for
82 /// this purpose and known to the other processor.
83 pub unsafe fn create(mem: *mut u8, mem_len: usize) -> Self {
84 let mut slf = Self::attach(mem, mem_len);
85 slf.set_write_pos(0);
86 slf.set_read_pos(0);
87 slf
88 }
89 /// Attaches to a queue which has previously been initialized by [`ByteQueue::create`],
90 /// possibly by an other processor.
91 ///
92 /// # Safety
93 /// This method must not be called before the other processor has properly initialized the
94 /// queue because this will most likely result in crashes.
95 ///
96 /// Obviously, the memory pointer rand the memory region length must be correct, reserved for
97 /// this purpose and known to the other processor.
98 pub unsafe fn attach(mem: *mut u8, mem_len: usize) -> Self {
99 ByteQueue {
100 write_pos_ptr: mem as *mut u32,
101 read_pos_ptr: (mem as *mut u32).offset(1),
102 data_ptr: mem.offset(
103 isize::try_from(2 * core::mem::size_of::<u32>())
104 .expect("~8u should be convertible to isize"),
105 ),
106 capacity: mem_len - 2 * core::mem::size_of::<u32>(),
107 }
108 }
109 fn get_write_pos(&self) -> usize {
110 unsafe { read_volatile(self.write_pos_ptr) as usize }
111 }
112 fn get_read_pos(&self) -> usize {
113 unsafe { read_volatile(self.read_pos_ptr) as usize }
114 }
115 fn set_write_pos(&mut self, wpos: usize) {
116 unsafe {
117 write_volatile(
118 self.write_pos_ptr,
119 wpos.try_into().expect("cannot convert usize into u32"),
120 )
121 }
122 }
123 fn set_read_pos(&mut self, rpos: usize) {
124 unsafe {
125 write_volatile(
126 self.read_pos_ptr,
127 rpos.try_into().expect("cannot convert usize into u32"),
128 )
129 }
130 }
131
132 /// Returns the size of the available space, which can be written into the queue.
133 pub fn space(&self) -> usize {
134 (self.capacity + self.get_read_pos() - self.get_write_pos() - 1) % self.capacity
135 }
136
137 pub fn capacity(&self) -> usize {
138 self.capacity - 1
139 }
140
141 /// Returns the size of the written messages, which are to be consumed or read.
142 pub fn size(&self) -> usize {
143 (self.capacity + self.get_write_pos() - self.get_read_pos()) % self.capacity
144 }
145
146 //
147 // Write methods
148 //
149
150 /// Writes at most `len` bytes into the byte queue, or less depending on the given size of
151 /// the data to be written, *and* the currently available space in the byte queue.
152 ///
153 /// Memory fences are used for proper synchronization.
154 pub fn write_at_most(&mut self, data: &[u8]) -> usize {
155 let len = data.len().min(self.space());
156
157 atomic::fence(Ordering::Acquire);
158 let wpos = self.get_write_pos();
159 for (i, byte) in data.iter().enumerate().take(len) {
160 let offset = (wpos + i) % self.capacity;
161 unsafe {
162 let dptr = self.data_ptr.add(offset);
163 write_volatile(dptr, *byte);
164 }
165 }
166 atomic::fence(Ordering::Release);
167 let wpos = (wpos + len) % self.capacity;
168 self.set_write_pos(wpos);
169
170 len
171 }
172
173 /// Attempts to write data to the queue in non-blocking mode.
174 ///
175 /// If there is not enough space to write the entire data, returns an error (`WouldBlock`).
176 /// On success, writes the data into the queue.
177 pub fn write_or_fail(&mut self, data: &[u8]) -> nb::Result<(), Infallible> {
178 if self.space() < data.len() {
179 return Err(nb::Error::WouldBlock);
180 }
181 self.write_at_most(data);
182 Ok(())
183 }
184
185 /// Blocks until there is enough space in the queue to write the data.
186 ///
187 /// Once space is available, writes `data.len()` bytes of data to the queue.
188 pub fn write_blocking(&mut self, data: &[u8]) {
189 loop {
190 if self.space() >= data.len() {
191 break;
192 }
193 }
194 self.write_at_most(data);
195 }
196
197 //
198 // Skip methods
199 //
200
201 /// Skips at most `len` bytes, or less depending on the size of the written data in the byte
202 /// queue.
203 pub fn skip_at_most(&mut self, len: usize) -> usize {
204 let len = len.min(self.size());
205 self.set_read_pos((self.get_read_pos() + len) % self.capacity);
206
207 len
208 }
209
210 /// Attempts to skip `len` bytes in non-blocking mode.
211 ///
212 /// If there is not enough data to be skipped, returns an error (`WouldBlock`).
213 /// On success, skips `len` bytes of written data.
214 pub fn skip_or_fail(&mut self, len: usize) -> nb::Result<(), Infallible> {
215 if self.size() < len {
216 return Err(nb::Error::WouldBlock);
217 }
218 self.skip_at_most(len);
219 Ok(())
220 }
221
222 /// Blocks until there is enough data in the queue to be skipped.
223 ///
224 /// Once enough data is available, skips `len` bytes of data in the queue.
225 pub fn skip_blocking(&mut self, len: usize) {
226 loop {
227 if self.size() >= len {
228 break;
229 }
230 }
231 self.skip_at_most(len);
232 }
233
234 //
235 // Peek methods
236 //
237
238 /// Read at most `len` bytes without losing them in the queue, or less depending on the
239 /// size of the written data in the byte queue.
240 pub fn peek_at_most(&self, buf: &mut [u8], len: usize) -> usize {
241 let len = len.min(buf.len()).min(self.size());
242
243 atomic::fence(Ordering::Acquire);
244 let rpos = self.get_read_pos();
245 for (i, byte) in buf.iter_mut().enumerate().take(len) {
246 let offset = (rpos + i) % self.capacity;
247 unsafe {
248 let dptr = self.data_ptr.add(offset);
249 *byte = read_volatile(dptr);
250 }
251 }
252 atomic::fence(Ordering::Release);
253
254 // Here is where we would update the read position pointer in a consuming implementation.
255 // Since consume = peek + skip, the skip function does not need memory fencing because it
256 // is done here already.
257 len
258 }
259
260 /// Attempts to fill the buffer completely with the data in the byte queue in non-blocking
261 /// mode.
262 ///
263 /// If there is not enough data, returns an error (`WouldBlock`).
264 /// On success, read `buf.len()` bytes of written data without skipping them in the byte
265 /// queue.
266 pub fn peek_or_fail(&self, buf: &mut [u8]) -> nb::Result<(), Infallible> {
267 if self.size() < buf.len() {
268 return Err(nb::Error::WouldBlock);
269 }
270 self.peek_at_most(buf, buf.len());
271 Ok(())
272 }
273
274 /// Blocks until there is enough data in the queue to fill the buffer completely.
275 ///
276 /// On success, read `buf.len()` bytes of written data without skipping them in the byte
277 /// queue.
278 pub fn peek_blocking(&self, buf: &mut [u8]) {
279 loop {
280 if self.size() >= buf.len() {
281 break;
282 }
283 }
284 self.peek_at_most(buf, buf.len());
285 }
286
287 //
288 // Consume methods
289 //
290
291 /// Reads up to the available data into the provided buffer, returning the number of bytes read.
292 ///
293 /// This method reads/consumes at most the size of the buffer or the amount of available data,
294 /// whichever is smaller.
295 pub fn consume_at_most(&mut self, buf: &mut [u8]) -> usize {
296 let len = self.peek_at_most(buf, buf.len());
297
298 self.skip_at_most(len)
299 }
300
301 /// Attempts to read data from the queue in non-blocking mode. If there is not enough data in
302 /// `buf.len()` size available to be read, returns an error (`WouldBlock`).
303 ///
304 /// On success, reads/consumes the data in `buf.len()` size from the queue into
305 /// the provided buffer.
306 pub fn consume_or_fail(&mut self, buf: &mut [u8]) -> nb::Result<(), Infallible> {
307 self.peek_or_fail(buf)?;
308 self.skip_at_most(buf.len());
309
310 Ok(())
311 }
312
313 /// Blocks until there is enough data in the queue to fill the buffer completely.
314 ///
315 /// On success, reads/consumes the data in `buf.len()` size from the queue into
316 /// the provided buffer.
317 pub fn consume_blocking(&mut self, buf: &mut [u8]) {
318 self.peek_blocking(buf);
319 self.skip_at_most(buf.len());
320 }
321}
322
323impl core::fmt::Write for ByteQueue {
324 fn write_str(&mut self, s: &str) -> Result<(), core::fmt::Error> {
325 self.write_blocking(s.as_bytes());
326 Ok(())
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::ByteQueue;
333 const LEN_U32_TO_U8_SCALER: usize = core::mem::size_of::<u32>();
334
335 #[test]
336 fn test_peek() {
337 let mut buffer = [123u32; 17];
338 let mut writer = unsafe {
339 ByteQueue::create(
340 buffer.as_mut_ptr() as *mut u8,
341 buffer.len() * LEN_U32_TO_U8_SCALER,
342 )
343 };
344 let mut reader = unsafe {
345 ByteQueue::attach(
346 buffer.as_mut_ptr() as *mut u8,
347 buffer.len() * LEN_U32_TO_U8_SCALER,
348 )
349 };
350 let tx = [1, 2, 3, 4];
351 writer.write_or_fail(&tx).unwrap();
352
353 let mut rx = [0u8; 4];
354 reader.peek_or_fail(&mut rx).unwrap();
355 assert_eq!(&tx, &rx);
356 assert!(reader.size() == tx.len());
357 for i in 0..1234 {
358 reader.peek_at_most(&mut rx, i);
359 assert_eq!(&tx[..i.min(tx.len())], &rx[..i.min(rx.len())]);
360 assert!(reader.size() == tx.len());
361 }
362
363 reader.consume_or_fail(&mut rx).unwrap();
364 assert_eq!(&tx, &rx);
365 assert!(reader.size() == 0);
366 }
367
368 #[test]
369 fn test_skip() {
370 let mut buffer = [123u32; 55];
371 let mut writer = unsafe {
372 ByteQueue::create(
373 buffer.as_mut_ptr() as *mut u8,
374 buffer.len() * LEN_U32_TO_U8_SCALER,
375 )
376 };
377 let mut reader = unsafe {
378 ByteQueue::attach(
379 buffer.as_mut_ptr() as *mut u8,
380 buffer.len() * LEN_U32_TO_U8_SCALER,
381 )
382 };
383
384 let data = [0xffu8; 10];
385 let sum_to_ten = 55;
386 for i in 0..=10 {
387 writer.write_at_most(&data[..i]);
388 }
389
390 let mut skipped = 0;
391 for i in 0..=10 {
392 reader.skip_or_fail(i).unwrap();
393 skipped += i;
394 assert_eq!(reader.size(), sum_to_ten - skipped);
395 }
396 }
397
398 #[test]
399 fn write_read() {
400 let mut buffer = [123u32; 17];
401 let mut writer = unsafe {
402 ByteQueue::create(
403 buffer.as_mut_ptr() as *mut u8,
404 buffer.len() * LEN_U32_TO_U8_SCALER,
405 )
406 };
407 let mut reader = unsafe {
408 ByteQueue::attach(
409 buffer.as_mut_ptr() as *mut u8,
410 buffer.len() * LEN_U32_TO_U8_SCALER,
411 )
412 };
413 let tx = [1, 2, 3, 4];
414 writer.write_blocking(&tx);
415 let mut rx = [0u8; 4];
416 reader.consume_blocking(&mut rx);
417 assert_eq!(&tx, &rx);
418 }
419}