bbqueue_sync/bbbuffer.rs
1use crate::{
2 framed::{FrameConsumer, FrameProducer},
3 Error, Result,
4};
5use core::{
6 cell::UnsafeCell,
7 cmp::min,
8 marker::PhantomData,
9 mem::{forget, transmute, MaybeUninit},
10 ops::{Deref, DerefMut},
11 ptr::NonNull,
12 result::Result as CoreResult,
13 slice::from_raw_parts_mut,
14 sync::atomic::{
15 AtomicBool, AtomicUsize,
16 Ordering::{AcqRel, Acquire, Release},
17 },
18};
19#[derive(Debug)]
20/// A backing structure for a BBQueue. Can be used to create either
21/// a BBQueue or a split Producer/Consumer pair
22pub struct BBBuffer<const N: usize> {
23 buf: UnsafeCell<MaybeUninit<[u8; N]>>,
24
25 /// Where the next byte will be written
26 write: AtomicUsize,
27
28 /// Where the next byte will be read from
29 read: AtomicUsize,
30
31 /// Used in the inverted case to mark the end of the
32 /// readable streak. Otherwise will == sizeof::<self.buf>().
33 /// Writer is responsible for placing this at the correct
34 /// place when entering an inverted condition, and Reader
35 /// is responsible for moving it back to sizeof::<self.buf>()
36 /// when exiting the inverted condition
37 last: AtomicUsize,
38
39 /// Used by the Writer to remember what bytes are currently
40 /// allowed to be written to, but are not yet ready to be
41 /// read from
42 reserve: AtomicUsize,
43
44 /// Is there an active read grant?
45 read_in_progress: AtomicBool,
46
47 /// Is there an active write grant?
48 write_in_progress: AtomicBool,
49
50 /// Have we already split?
51 already_split: AtomicBool,
52}
53
54unsafe impl<const A: usize> Sync for BBBuffer<A> {}
55
56impl<'a, const N: usize> BBBuffer<N> {
57 /// Attempt to split the `BBBuffer` into `Consumer` and `Producer` halves to gain access to the
58 /// buffer. If buffer has already been split, an error will be returned.
59 ///
60 /// NOTE: When splitting, the underlying buffer will be explicitly initialized
61 /// to zero. This may take a measurable amount of time, depending on the size
62 /// of the buffer. This is necessary to prevent undefined behavior. If the buffer
63 /// is placed at `static` scope within the `.bss` region, the explicit initialization
64 /// will be elided (as it is already performed as part of memory initialization)
65 ///
66 /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
67 /// while splitting.
68 ///
69 /// ```rust
70 /// # // bbqueue test shim!
71 /// # fn bbqtest() {
72 /// use bbqueue::BBBuffer;
73 ///
74 /// // Create and split a new buffer
75 /// let buffer: BBBuffer<6> = BBBuffer::new();
76 /// let (prod, cons) = buffer.try_split().unwrap();
77 ///
78 /// // Not possible to split twice
79 /// assert!(buffer.try_split().is_err());
80 /// # // bbqueue test shim!
81 /// # }
82 /// #
83 /// # fn main() {
84 /// # #[cfg(not(feature = "thumbv6"))]
85 /// # bbqtest();
86 /// # }
87 /// ```
88 pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> {
89 if atomic::swap(&self.already_split, true, AcqRel) {
90 return Err(Error::AlreadySplit);
91 }
92
93 unsafe {
94 // Explicitly zero the data to avoid undefined behavior.
95 // This is required, because we hand out references to the buffers,
96 // which mean that creating them as references is technically UB for now
97 let mu_ptr = self.buf.get();
98 (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);
99
100 let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);
101 let nn2 = NonNull::new_unchecked(self as *const _ as *mut _);
102
103 Ok((
104 Producer {
105 bbq: nn1,
106 pd: PhantomData,
107 },
108 Consumer {
109 bbq: nn2,
110 pd: PhantomData,
111 },
112 ))
113 }
114 }
115
116 /// Attempt to split the `BBBuffer` into `FrameConsumer` and `FrameProducer` halves
117 /// to gain access to the buffer. If buffer has already been split, an error
118 /// will be returned.
119 ///
120 /// NOTE: When splitting, the underlying buffer will be explicitly initialized
121 /// to zero. This may take a measurable amount of time, depending on the size
122 /// of the buffer. This is necessary to prevent undefined behavior. If the buffer
123 /// is placed at `static` scope within the `.bss` region, the explicit initialization
124 /// will be elided (as it is already performed as part of memory initialization)
125 ///
126 /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
127 /// section while splitting.
128 pub fn try_split_framed(&'a self) -> Result<(FrameProducer<'a, N>, FrameConsumer<'a, N>)> {
129 let (producer, consumer) = self.try_split()?;
130 Ok((FrameProducer { producer }, FrameConsumer { consumer }))
131 }
132
133 /// Attempt to release the Producer and Consumer
134 ///
135 /// This re-initializes the buffer so it may be split in a different mode at a later
136 /// time. There must be no read or write grants active, or an error will be returned.
137 ///
138 /// The `Producer` and `Consumer` must be from THIS `BBBuffer`, or an error will
139 /// be returned.
140 ///
141 /// ```rust
142 /// # // bbqueue test shim!
143 /// # fn bbqtest() {
144 /// use bbqueue::BBBuffer;
145 ///
146 /// // Create and split a new buffer
147 /// let buffer: BBBuffer<6> = BBBuffer::new();
148 /// let (prod, cons) = buffer.try_split().unwrap();
149 ///
150 /// // Not possible to split twice
151 /// assert!(buffer.try_split().is_err());
152 ///
153 /// // Release the producer and consumer
154 /// assert!(buffer.try_release(prod, cons).is_ok());
155 ///
156 /// // Split the buffer in framed mode
157 /// let (fprod, fcons) = buffer.try_split_framed().unwrap();
158 /// # // bbqueue test shim!
159 /// # }
160 /// #
161 /// # fn main() {
162 /// # #[cfg(not(feature = "thumbv6"))]
163 /// # bbqtest();
164 /// # }
165 /// ```
166 pub fn try_release(
167 &'a self,
168 prod: Producer<'a, N>,
169 cons: Consumer<'a, N>,
170 ) -> CoreResult<(), (Producer<'a, N>, Consumer<'a, N>)> {
171 // Note: Re-entrancy is not possible because we require ownership
172 // of the producer and consumer, which are not cloneable. We also
173 // can assume the buffer has been split, because
174
175 // Are these our producers and consumers?
176 let our_prod = prod.bbq.as_ptr() as *const Self == self;
177 let our_cons = cons.bbq.as_ptr() as *const Self == self;
178
179 if !(our_prod && our_cons) {
180 // Can't release, not our producer and consumer
181 return Err((prod, cons));
182 }
183
184 let wr_in_progress = self.write_in_progress.load(Acquire);
185 let rd_in_progress = self.read_in_progress.load(Acquire);
186
187 if wr_in_progress || rd_in_progress {
188 // Can't release, active grant(s) in progress
189 return Err((prod, cons));
190 }
191
192 // Drop the producer and consumer halves
193 drop(prod);
194 drop(cons);
195
196 // Re-initialize the buffer (not totally needed, but nice to do)
197 self.write.store(0, Release);
198 self.read.store(0, Release);
199 self.reserve.store(0, Release);
200 self.last.store(0, Release);
201
202 // Mark the buffer as ready to be split
203 self.already_split.store(false, Release);
204
205 Ok(())
206 }
207
208 /// Attempt to release the Producer and Consumer in Framed mode
209 ///
210 /// This re-initializes the buffer so it may be split in a different mode at a later
211 /// time. There must be no read or write grants active, or an error will be returned.
212 ///
213 /// The `FrameProducer` and `FrameConsumer` must be from THIS `BBBuffer`, or an error
214 /// will be returned.
215 pub fn try_release_framed(
216 &'a self,
217 prod: FrameProducer<'a, N>,
218 cons: FrameConsumer<'a, N>,
219 ) -> CoreResult<(), (FrameProducer<'a, N>, FrameConsumer<'a, N>)> {
220 self.try_release(prod.producer, cons.consumer)
221 .map_err(|(producer, consumer)| {
222 // Restore the wrapper types
223 (FrameProducer { producer }, FrameConsumer { consumer })
224 })
225 }
226}
227
228impl<const A: usize> BBBuffer<A> {
229 /// Create a new constant inner portion of a `BBBuffer`.
230 ///
231 /// NOTE: This is only necessary to use when creating a `BBBuffer` at static
232 /// scope, and is generally never used directly. This process is necessary to
233 /// work around current limitations in `const fn`, and will be replaced in
234 /// the future.
235 ///
236 /// ```rust,no_run
237 /// use bbqueue::BBBuffer;
238 ///
239 /// static BUF: BBBuffer<6> = BBBuffer::new();
240 ///
241 /// fn main() {
242 /// let (prod, cons) = BUF.try_split().unwrap();
243 /// }
244 /// ```
245 pub const fn new() -> Self {
246 Self {
247 // This will not be initialized until we split the buffer
248 buf: UnsafeCell::new(MaybeUninit::uninit()),
249
250 /// Owned by the writer
251 write: AtomicUsize::new(0),
252
253 /// Owned by the reader
254 read: AtomicUsize::new(0),
255
256 /// Cooperatively owned
257 ///
258 /// NOTE: This should generally be initialized as size_of::<self.buf>(), however
259 /// this would prevent the structure from being entirely zero-initialized,
260 /// and can cause the .data section to be much larger than necessary. By
261 /// forcing the `last` pointer to be zero initially, we place the structure
262 /// in an "inverted" condition, which will be resolved on the first commited
263 /// bytes that are written to the structure.
264 ///
265 /// When read == last == write, no bytes will be allowed to be read (good), but
266 /// write grants can be given out (also good).
267 last: AtomicUsize::new(0),
268
269 /// Owned by the Writer, "private"
270 reserve: AtomicUsize::new(0),
271
272 /// Owned by the Reader, "private"
273 read_in_progress: AtomicBool::new(false),
274
275 /// Owned by the Writer, "private"
276 write_in_progress: AtomicBool::new(false),
277
278 /// We haven't split at the start
279 already_split: AtomicBool::new(false),
280 }
281 }
282}
283
284/// `Producer` is the primary interface for pushing data into a `BBBuffer`.
285/// There are various methods for obtaining a grant to write to the buffer, with
286/// different potential tradeoffs. As all grants are required to be a contiguous
287/// range of data, different strategies are sometimes useful when making the decision
288/// between maximizing usage of the buffer, and ensuring a given grant is successful.
289///
290/// As a short summary of currently possible grants:
291///
292/// * `grant_exact(N)`
293/// * User will receive a grant `sz == N` (or receive an error)
294/// * This may cause a wraparound if a grant of size N is not available
295/// at the end of the ring.
296/// * If this grant caused a wraparound, the bytes that were "skipped" at the
297/// end of the ring will not be available until the reader reaches them,
298/// regardless of whether the grant commited any data or not.
299/// * Maximum possible waste due to skipping: `N - 1` bytes
300/// * `grant_max_remaining(N)`
301/// * User will receive a grant `0 < sz <= N` (or receive an error)
302/// * This will only cause a wrap to the beginning of the ring if exactly
303/// zero bytes are available at the end of the ring.
304/// * Maximum possible waste due to skipping: 0 bytes
305///
306/// See [this github issue](https://github.com/jamesmunns/bbqueue/issues/38) for a
307/// discussion of grant methods that could be added in the future.
308pub struct Producer<'a, const N: usize> {
309 bbq: NonNull<BBBuffer<N>>,
310 pd: PhantomData<&'a ()>,
311}
312
313unsafe impl<'a, const N: usize> Send for Producer<'a, N> {}
314unsafe impl<'a, const N: usize> Sync for Producer<'a, N> {}
315
316impl<'a, const N: usize> Producer<'a, N> {
317 /// Request a writable, contiguous section of memory of exactly
318 /// `sz` bytes. If the buffer size requested is not available,
319 /// an error will be returned.
320 ///
321 /// This method may cause the buffer to wrap around early if the
322 /// requested space is not available at the end of the buffer, but
323 /// is available at the beginning
324 ///
325 /// ```rust
326 /// # // bbqueue test shim!
327 /// # fn bbqtest() {
328 /// use bbqueue::BBBuffer;
329 ///
330 /// // Create and split a new buffer of 6 elements
331 /// let buffer: BBBuffer<6> = BBBuffer::new();
332 /// let (mut prod, cons) = buffer.try_split().unwrap();
333 ///
334 /// // Successfully obtain and commit a grant of four bytes
335 /// let mut grant = prod.grant_exact(4).unwrap();
336 /// assert_eq!(grant.buf().len(), 4);
337 /// grant.commit(4);
338 ///
339 /// // Try to obtain a grant of three bytes
340 /// assert!(prod.grant_exact(3).is_err());
341 /// # // bbqueue test shim!
342 /// # }
343 /// #
344 /// # fn main() {
345 /// # #[cfg(not(feature = "thumbv6"))]
346 /// # bbqtest();
347 /// # }
348 /// ```
349 pub fn grant_exact(&self, sz: usize) -> Result<GrantW<'a, N>> {
350 let inner = unsafe { &self.bbq.as_ref() };
351
352 if atomic::swap(&inner.write_in_progress, true, AcqRel) {
353 return Err(Error::GrantInProgress);
354 }
355
356 // Writer component. Must never write to `read`,
357 // be careful writing to `load`
358 let write = inner.write.load(Acquire);
359 let read = inner.read.load(Acquire);
360 let max = N;
361 let already_inverted = write < read;
362
363 let start = if already_inverted {
364 if (write + sz) < read {
365 // Inverted, room is still available
366 write
367 } else {
368 // Inverted, no room is available
369 inner.write_in_progress.store(false, Release);
370 return Err(Error::InsufficientSize);
371 }
372 } else {
373 if write + sz <= max {
374 // Non inverted condition
375 write
376 } else {
377 // Not inverted, but need to go inverted
378
379 // NOTE: We check sz < read, NOT <=, because
380 // write must never == read in an inverted condition, since
381 // we will then not be able to tell if we are inverted or not
382 if sz < read {
383 // Invertible situation
384 0
385 } else {
386 // Not invertible, no space
387 inner.write_in_progress.store(false, Release);
388 return Err(Error::InsufficientSize);
389 }
390 }
391 };
392
393 // Safe write, only viewed by this task
394 inner.reserve.store(start + sz, Release);
395
396 // This is sound, as UnsafeCell, MaybeUninit, and GenericArray
397 // are all `#[repr(Transparent)]
398 let start_of_buf_ptr = inner.buf.get().cast::<u8>();
399 let grant_slice =
400 unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(start as isize), sz) };
401
402 Ok(GrantW {
403 buf: grant_slice,
404 bbq: self.bbq,
405 to_commit: 0,
406 })
407 }
408
409 /// Request a writable, contiguous section of memory of up to
410 /// `sz` bytes. If a buffer of size `sz` is not available without
411 /// wrapping, but some space (0 < available < sz) is available without
412 /// wrapping, then a grant will be given for the remaining size at the
413 /// end of the buffer. If no space is available for writing, an error
414 /// will be returned.
415 ///
416 /// ```
417 /// # // bbqueue test shim!
418 /// # fn bbqtest() {
419 /// use bbqueue::BBBuffer;
420 ///
421 /// // Create and split a new buffer of 6 elements
422 /// let buffer: BBBuffer<6> = BBBuffer::new();
423 /// let (mut prod, mut cons) = buffer.try_split().unwrap();
424 ///
425 /// // Successfully obtain and commit a grant of four bytes
426 /// let mut grant = prod.grant_max_remaining(4).unwrap();
427 /// assert_eq!(grant.buf().len(), 4);
428 /// grant.commit(4);
429 ///
430 /// // Release the four initial commited bytes
431 /// let mut grant = cons.read().unwrap();
432 /// assert_eq!(grant.buf().len(), 4);
433 /// grant.release(4);
434 ///
435 /// // Try to obtain a grant of three bytes, get two bytes
436 /// let mut grant = prod.grant_max_remaining(3).unwrap();
437 /// assert_eq!(grant.buf().len(), 2);
438 /// grant.commit(2);
439 /// # // bbqueue test shim!
440 /// # }
441 /// #
442 /// # fn main() {
443 /// # #[cfg(not(feature = "thumbv6"))]
444 /// # bbqtest();
445 /// # }
446 /// ```
447 pub fn grant_max_remaining(&self, mut sz: usize) -> Result<GrantW<'a, N>> {
448 let inner = unsafe { &self.bbq.as_ref() };
449
450 if atomic::swap(&inner.write_in_progress, true, AcqRel) {
451 return Err(Error::GrantInProgress);
452 }
453
454 // Writer component. Must never write to `read`,
455 // be careful writing to `load`
456 let write = inner.write.load(Acquire);
457 let read = inner.read.load(Acquire);
458 let max = N;
459
460 let already_inverted = write < read;
461
462 let start = if already_inverted {
463 // In inverted case, read is always > write
464 let remain = read - write - 1;
465
466 if remain != 0 {
467 sz = min(remain, sz);
468 write
469 } else {
470 // Inverted, no room is available
471 inner.write_in_progress.store(false, Release);
472 return Err(Error::InsufficientSize);
473 }
474 } else {
475 if write != max {
476 // Some (or all) room remaining in un-inverted case
477 sz = min(max - write, sz);
478 write
479 } else {
480 // Not inverted, but need to go inverted
481
482 // NOTE: We check read > 1, NOT read >= 1, because
483 // write must never == read in an inverted condition, since
484 // we will then not be able to tell if we are inverted or not
485 if read > 1 {
486 sz = min(read - 1, sz);
487 0
488 } else {
489 // Not invertible, no space
490 inner.write_in_progress.store(false, Release);
491 return Err(Error::InsufficientSize);
492 }
493 }
494 };
495
496 // Safe write, only viewed by this task
497 inner.reserve.store(start + sz, Release);
498
499 // This is sound, as UnsafeCell, MaybeUninit, and GenericArray
500 // are all `#[repr(Transparent)]
501 let start_of_buf_ptr = inner.buf.get().cast::<u8>();
502 let grant_slice =
503 unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(start as isize), sz) };
504
505 Ok(GrantW {
506 buf: grant_slice,
507 bbq: self.bbq,
508 to_commit: 0,
509 })
510 }
511}
512
513/// `Consumer` is the primary interface for reading data from a `BBBuffer`.
514pub struct Consumer<'a, const N: usize> {
515 bbq: NonNull<BBBuffer<N>>,
516 pd: PhantomData<&'a ()>,
517}
518
519unsafe impl<'a, const N: usize> Send for Consumer<'a, N> {}
520unsafe impl<'a, const N: usize> Sync for Consumer<'a, N> {}
521
522impl<'a, const N: usize> Consumer<'a, N> {
523 /// Obtains a contiguous slice of committed bytes. This slice may not
524 /// contain ALL available bytes, if the writer has wrapped around. The
525 /// remaining bytes will be available after all readable bytes are
526 /// released
527 ///
528 /// ```rust
529 /// # // bbqueue test shim!
530 /// # fn bbqtest() {
531 /// use bbqueue::BBBuffer;
532 ///
533 /// // Create and split a new buffer of 6 elements
534 /// let buffer: BBBuffer<6> = BBBuffer::new();
535 /// let (mut prod, mut cons) = buffer.try_split().unwrap();
536 ///
537 /// // Successfully obtain and commit a grant of four bytes
538 /// let mut grant = prod.grant_max_remaining(4).unwrap();
539 /// assert_eq!(grant.buf().len(), 4);
540 /// grant.commit(4);
541 ///
542 /// // Obtain a read grant
543 /// let mut grant = cons.read().unwrap();
544 /// assert_eq!(grant.buf().len(), 4);
545 /// # // bbqueue test shim!
546 /// # }
547 /// #
548 /// # fn main() {
549 /// # #[cfg(not(feature = "thumbv6"))]
550 /// # bbqtest();
551 /// # }
552 /// ```
553 pub fn read(&self) -> Result<GrantR<'a, N>> {
554 let inner = unsafe { &self.bbq.as_ref() };
555
556 if atomic::swap(&inner.read_in_progress, true, AcqRel) {
557 return Err(Error::GrantInProgress);
558 }
559
560 let write = inner.write.load(Acquire);
561 let last = inner.last.load(Acquire);
562 let mut read = inner.read.load(Acquire);
563
564 // Resolve the inverted case or end of read
565 if (read == last) && (write < read) {
566 read = 0;
567 // This has some room for error, the other thread reads this
568 // Impact to Grant:
569 // Grant checks if read < write to see if inverted. If not inverted, but
570 // no space left, Grant will initiate an inversion, but will not trigger it
571 // Impact to Commit:
572 // Commit does not check read, but if Grant has started an inversion,
573 // grant could move Last to the prior write position
574 // MOVING READ BACKWARDS!
575 inner.read.store(0, Release);
576 }
577
578 let sz = if write < read {
579 // Inverted, only believe last
580 last
581 } else {
582 // Not inverted, only believe write
583 write
584 } - read;
585
586 if sz == 0 {
587 inner.read_in_progress.store(false, Release);
588 return Err(Error::InsufficientSize);
589 }
590
591 // This is sound, as UnsafeCell, MaybeUninit, and GenericArray
592 // are all `#[repr(Transparent)]
593 let start_of_buf_ptr = inner.buf.get().cast::<u8>();
594 let grant_slice = unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(read as isize), sz) };
595
596 Ok(GrantR {
597 buf: grant_slice,
598 bbq: self.bbq,
599 to_release: 0,
600 })
601 }
602
603 /// Obtains two disjoint slices, which are each contiguous of committed bytes.
604 /// Combined these contain all previously commited data.
605 pub fn split_read(&self) -> Result<SplitGrantR<'a, N>> {
606 let inner = unsafe { &self.bbq.as_ref() };
607
608 if atomic::swap(&inner.read_in_progress, true, AcqRel) {
609 return Err(Error::GrantInProgress);
610 }
611
612 let write = inner.write.load(Acquire);
613 let last = inner.last.load(Acquire);
614 let mut read = inner.read.load(Acquire);
615
616 // Resolve the inverted case or end of read
617 if (read == last) && (write < read) {
618 read = 0;
619 // This has some room for error, the other thread reads this
620 // Impact to Grant:
621 // Grant checks if read < write to see if inverted. If not inverted, but
622 // no space left, Grant will initiate an inversion, but will not trigger it
623 // Impact to Commit:
624 // Commit does not check read, but if Grant has started an inversion,
625 // grant could move Last to the prior write position
626 // MOVING READ BACKWARDS!
627 inner.read.store(0, Release);
628 }
629
630 let (sz1, sz2) = if write < read {
631 // Inverted, only believe last
632 (last - read, write)
633 } else {
634 // Not inverted, only believe write
635 (write - read, 0)
636 };
637
638 if sz1 == 0 {
639 inner.read_in_progress.store(false, Release);
640 return Err(Error::InsufficientSize);
641 }
642
643 // This is sound, as UnsafeCell, MaybeUninit, and GenericArray
644 // are all `#[repr(Transparent)]
645 let start_of_buf_ptr = inner.buf.get().cast::<u8>();
646 let grant_slice1 =
647 unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(read as isize), sz1) };
648 let grant_slice2 = unsafe { from_raw_parts_mut(start_of_buf_ptr, sz2) };
649
650 Ok(SplitGrantR {
651 buf1: grant_slice1,
652 buf2: grant_slice2,
653 bbq: self.bbq,
654 to_release: 0,
655 })
656 }
657}
658
659impl<const N: usize> BBBuffer<N> {
660 /// Returns the size of the backing storage.
661 ///
662 /// This is the maximum number of bytes that can be stored in this queue.
663 ///
664 /// ```rust
665 /// # // bbqueue test shim!
666 /// # fn bbqtest() {
667 /// use bbqueue::BBBuffer;
668 ///
669 /// // Create a new buffer of 6 elements
670 /// let buffer: BBBuffer<6> = BBBuffer::new();
671 /// assert_eq!(buffer.capacity(), 6);
672 /// # // bbqueue test shim!
673 /// # }
674 /// #
675 /// # fn main() {
676 /// # #[cfg(not(feature = "thumbv6"))]
677 /// # bbqtest();
678 /// # }
679 /// ```
680 pub const fn capacity(&self) -> usize {
681 N
682 }
683}
684
685/// A structure representing a contiguous region of memory that
686/// may be written to, and potentially "committed" to the queue.
687///
688/// NOTE: If the grant is dropped without explicitly commiting
689/// the contents, or by setting a the number of bytes to
690/// automatically be committed with `to_commit()`, then no bytes
691/// will be comitted for writing.
692///
693/// If the `thumbv6` feature is selected, dropping the grant
694/// without committing it takes a short critical section,
695#[derive(Debug, PartialEq)]
696pub struct GrantW<'a, const N: usize> {
697 pub(crate) buf: &'a mut [u8],
698 bbq: NonNull<BBBuffer<N>>,
699 pub(crate) to_commit: usize,
700}
701
702unsafe impl<'a, const N: usize> Send for GrantW<'a, N> {}
703
704/// A structure representing a contiguous region of memory that
705/// may be read from, and potentially "released" (or cleared)
706/// from the queue
707///
708/// NOTE: If the grant is dropped without explicitly releasing
709/// the contents, or by setting the number of bytes to automatically
710/// be released with `to_release()`, then no bytes will be released
711/// as read.
712///
713///
714/// If the `thumbv6` feature is selected, dropping the grant
715/// without releasing it takes a short critical section,
716#[derive(Debug, PartialEq)]
717pub struct GrantR<'a, const N: usize> {
718 pub(crate) buf: &'a mut [u8],
719 bbq: NonNull<BBBuffer<N>>,
720 pub(crate) to_release: usize,
721}
722
723/// A structure representing up to two contiguous regions of memory that
724/// may be read from, and potentially "released" (or cleared)
725/// from the queue
726#[derive(Debug, PartialEq)]
727pub struct SplitGrantR<'a, const N: usize> {
728 pub(crate) buf1: &'a mut [u8],
729 pub(crate) buf2: &'a mut [u8],
730 bbq: NonNull<BBBuffer<N>>,
731 pub(crate) to_release: usize,
732}
733
734unsafe impl<'a, const N: usize> Send for GrantR<'a, N> {}
735
736unsafe impl<'a, const N: usize> Send for SplitGrantR<'a, N> {}
737
738impl<'a, const N: usize> GrantW<'a, N> {
739 /// Finalizes a writable grant given by `grant()` or `grant_max()`.
740 /// This makes the data available to be read via `read()`. This consumes
741 /// the grant.
742 ///
743 /// If `used` is larger than the given grant, the maximum amount will
744 /// be commited
745 ///
746 /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
747 /// section while committing.
748 pub fn commit(mut self, used: usize) {
749 self.commit_inner(used);
750 forget(self);
751 }
752
753 /// Obtain access to the inner buffer for writing
754 ///
755 /// ```rust
756 /// # // bbqueue test shim!
757 /// # fn bbqtest() {
758 /// use bbqueue::BBBuffer;
759 ///
760 /// // Create and split a new buffer of 6 elements
761 /// let buffer: BBBuffer<6> = BBBuffer::new();
762 /// let (mut prod, mut cons) = buffer.try_split().unwrap();
763 ///
764 /// // Successfully obtain and commit a grant of four bytes
765 /// let mut grant = prod.grant_max_remaining(4).unwrap();
766 /// grant.buf().copy_from_slice(&[1, 2, 3, 4]);
767 /// grant.commit(4);
768 /// # // bbqueue test shim!
769 /// # }
770 /// #
771 /// # fn main() {
772 /// # #[cfg(not(feature = "thumbv6"))]
773 /// # bbqtest();
774 /// # }
775 /// ```
776 pub fn buf(&mut self) -> &mut [u8] {
777 self.buf
778 }
779
780 /// Sometimes, it's not possible for the lifetimes to check out. For example,
781 /// if you need to hand this buffer to a function that expects to receive a
782 /// `&'static mut [u8]`, it is not possible for the inner reference to outlive the
783 /// grant itself.
784 ///
785 /// You MUST guarantee that in no cases, the reference that is returned here outlives
786 /// the grant itself. Once the grant has been released, referencing the data contained
787 /// WILL cause undefined behavior.
788 ///
789 /// Additionally, you must ensure that a separate reference to this data is not created
790 /// to this data, e.g. using `DerefMut` or the `buf()` method of this grant.
791 pub unsafe fn as_static_mut_buf(&mut self) -> &'static mut [u8] {
792 transmute::<&mut [u8], &'static mut [u8]>(self.buf)
793 }
794
795 #[inline(always)]
796 pub(crate) fn commit_inner(&mut self, used: usize) {
797 let inner = unsafe { &self.bbq.as_ref() };
798
799 // If there is no grant in progress, return early. This
800 // generally means we are dropping the grant within a
801 // wrapper structure
802 if !inner.write_in_progress.load(Acquire) {
803 return;
804 }
805
806 // Writer component. Must never write to READ,
807 // be careful writing to LAST
808
809 // Saturate the grant commit
810 let len = self.buf.len();
811 let used = min(len, used);
812
813 let write = inner.write.load(Acquire);
814 atomic::fetch_sub(&inner.reserve, len - used, AcqRel);
815
816 let max = N;
817 let last = inner.last.load(Acquire);
818 let new_write = inner.reserve.load(Acquire);
819
820 if (new_write < write) && (write != max) {
821 // We have already wrapped, but we are skipping some bytes at the end of the ring.
822 // Mark `last` where the write pointer used to be to hold the line here
823 inner.last.store(write, Release);
824 } else if new_write > last {
825 // We're about to pass the last pointer, which was previously the artificial
826 // end of the ring. Now that we've passed it, we can "unlock" the section
827 // that was previously skipped.
828 //
829 // Since new_write is strictly larger than last, it is safe to move this as
830 // the other thread will still be halted by the (about to be updated) write
831 // value
832 inner.last.store(max, Release);
833 }
834 // else: If new_write == last, either:
835 // * last == max, so no need to write, OR
836 // * If we write in the end chunk again, we'll update last to max next time
837 // * If we write to the start chunk in a wrap, we'll update last when we
838 // move write backwards
839
840 // Write must be updated AFTER last, otherwise read could think it was
841 // time to invert early!
842 inner.write.store(new_write, Release);
843
844 // Allow subsequent grants
845 inner.write_in_progress.store(false, Release);
846 }
847
848 /// Configures the amount of bytes to be commited on drop.
849 pub fn to_commit(&mut self, amt: usize) {
850 self.to_commit = self.buf.len().min(amt);
851 }
852}
853
854impl<'a, const N: usize> GrantR<'a, N> {
855 /// Release a sequence of bytes from the buffer, allowing the space
856 /// to be used by later writes. This consumes the grant.
857 ///
858 /// If `used` is larger than the given grant, the full grant will
859 /// be released.
860 ///
861 /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
862 /// section while releasing.
863 pub fn release(mut self, used: usize) {
864 // Saturate the grant release
865 let used = min(self.buf.len(), used);
866
867 self.release_inner(used);
868 forget(self);
869 }
870
871 pub(crate) fn shrink(&mut self, len: usize) {
872 let mut new_buf: &mut [u8] = &mut [];
873 core::mem::swap(&mut self.buf, &mut new_buf);
874 let (new, _) = new_buf.split_at_mut(len);
875 self.buf = new;
876 }
877
878 /// Obtain access to the inner buffer for reading
879 ///
880 /// ```
881 /// # // bbqueue test shim!
882 /// # fn bbqtest() {
883 /// use bbqueue::BBBuffer;
884 ///
885 /// // Create and split a new buffer of 6 elements
886 /// let buffer: BBBuffer<6> = BBBuffer::new();
887 /// let (mut prod, mut cons) = buffer.try_split().unwrap();
888 ///
889 /// // Successfully obtain and commit a grant of four bytes
890 /// let mut grant = prod.grant_max_remaining(4).unwrap();
891 /// grant.buf().copy_from_slice(&[1, 2, 3, 4]);
892 /// grant.commit(4);
893 ///
894 /// // Obtain a read grant, and copy to a buffer
895 /// let mut grant = cons.read().unwrap();
896 /// let mut buf = [0u8; 4];
897 /// buf.copy_from_slice(grant.buf());
898 /// assert_eq!(&buf, &[1, 2, 3, 4]);
899 /// # // bbqueue test shim!
900 /// # }
901 /// #
902 /// # fn main() {
903 /// # #[cfg(not(feature = "thumbv6"))]
904 /// # bbqtest();
905 /// # }
906 /// ```
907 pub fn buf(&self) -> &[u8] {
908 self.buf
909 }
910
911 /// Obtain mutable access to the read grant
912 ///
913 /// This is useful if you are performing in-place operations
914 /// on an incoming packet, such as decryption
915 pub fn buf_mut(&mut self) -> &mut [u8] {
916 self.buf
917 }
918
919 /// Sometimes, it's not possible for the lifetimes to check out. For example,
920 /// if you need to hand this buffer to a function that expects to receive a
921 /// `&'static [u8]`, it is not possible for the inner reference to outlive the
922 /// grant itself.
923 ///
924 /// You MUST guarantee that in no cases, the reference that is returned here outlives
925 /// the grant itself. Once the grant has been released, referencing the data contained
926 /// WILL cause undefined behavior.
927 ///
928 /// Additionally, you must ensure that a separate reference to this data is not created
929 /// to this data, e.g. using `Deref` or the `buf()` method of this grant.
930 pub unsafe fn as_static_buf(&self) -> &'static [u8] {
931 transmute::<&[u8], &'static [u8]>(self.buf)
932 }
933
934 #[inline(always)]
935 pub(crate) fn release_inner(&mut self, used: usize) {
936 let inner = unsafe { &self.bbq.as_ref() };
937
938 // If there is no grant in progress, return early. This
939 // generally means we are dropping the grant within a
940 // wrapper structure
941 if !inner.read_in_progress.load(Acquire) {
942 return;
943 }
944
945 // This should always be checked by the public interfaces
946 debug_assert!(used <= self.buf.len());
947
948 // This should be fine, purely incrementing
949 let _ = atomic::fetch_add(&inner.read, used, Release);
950
951 inner.read_in_progress.store(false, Release);
952 }
953
954 /// Configures the amount of bytes to be released on drop.
955 pub fn to_release(&mut self, amt: usize) {
956 self.to_release = self.buf.len().min(amt);
957 }
958}
959
960impl<'a, const N: usize> SplitGrantR<'a, N> {
961 /// Release a sequence of bytes from the buffer, allowing the space
962 /// to be used by later writes. This consumes the grant.
963 ///
964 /// If `used` is larger than the given grant, the full grant will
965 /// be released.
966 ///
967 /// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
968 /// section while releasing.
969 pub fn release(mut self, used: usize) {
970 // Saturate the grant release
971 let used = min(self.combined_len(), used);
972
973 self.release_inner(used);
974 forget(self);
975 }
976
977 /// Obtain access to both inner buffers for reading
978 ///
979 /// ```
980 /// # // bbqueue test shim!
981 /// # fn bbqtest() {
982 /// use bbqueue::BBBuffer;
983 ///
984 /// // Create and split a new buffer of 6 elements
985 /// let buffer: BBBuffer<6> = BBBuffer::new();
986 /// let (mut prod, mut cons) = buffer.try_split().unwrap();
987 ///
988 /// // Successfully obtain and commit a grant of four bytes
989 /// let mut grant = prod.grant_max_remaining(4).unwrap();
990 /// grant.buf().copy_from_slice(&[1, 2, 3, 4]);
991 /// grant.commit(4);
992 ///
993 /// // Obtain a read grant, and copy to a buffer
994 /// let mut grant = cons.read().unwrap();
995 /// let mut buf = [0u8; 4];
996 /// buf.copy_from_slice(grant.buf());
997 /// assert_eq!(&buf, &[1, 2, 3, 4]);
998 /// # // bbqueue test shim!
999 /// # }
1000 /// #
1001 /// # fn main() {
1002 /// # #[cfg(not(feature = "thumbv6"))]
1003 /// # bbqtest();
1004 /// # }
1005 /// ```
1006 pub fn bufs(&self) -> (&[u8], &[u8]) {
1007 (self.buf1, self.buf2)
1008 }
1009
1010 /// Obtain mutable access to both parts of the read grant
1011 ///
1012 /// This is useful if you are performing in-place operations
1013 /// on an incoming packet, such as decryption
1014 pub fn bufs_mut(&mut self) -> (&mut [u8], &mut [u8]) {
1015 (self.buf1, self.buf2)
1016 }
1017
1018 #[inline(always)]
1019 pub(crate) fn release_inner(&mut self, used: usize) {
1020 let inner = unsafe { &self.bbq.as_ref() };
1021
1022 // If there is no grant in progress, return early. This
1023 // generally means we are dropping the grant within a
1024 // wrapper structure
1025 if !inner.read_in_progress.load(Acquire) {
1026 return;
1027 }
1028
1029 // This should always be checked by the public interfaces
1030 debug_assert!(used <= self.combined_len());
1031
1032 if used <= self.buf1.len() {
1033 // This should be fine, purely incrementing
1034 let _ = atomic::fetch_add(&inner.read, used, Release);
1035 } else {
1036 // Also release parts of the second buffer
1037 inner.read.store(used - self.buf1.len(), Release);
1038 }
1039
1040 inner.read_in_progress.store(false, Release);
1041 }
1042
1043 /// Configures the amount of bytes to be released on drop.
1044 pub fn to_release(&mut self, amt: usize) {
1045 self.to_release = self.combined_len().min(amt);
1046 }
1047
1048 /// The combined length of both buffers
1049 pub fn combined_len(&self) -> usize {
1050 self.buf1.len() + self.buf2.len()
1051 }
1052}
1053
1054impl<'a, const N: usize> Drop for GrantW<'a, N> {
1055 fn drop(&mut self) {
1056 self.commit_inner(self.to_commit)
1057 }
1058}
1059
1060impl<'a, const N: usize> Drop for GrantR<'a, N> {
1061 fn drop(&mut self) {
1062 self.release_inner(self.to_release)
1063 }
1064}
1065
1066impl<'a, const N: usize> Drop for SplitGrantR<'a, N> {
1067 fn drop(&mut self) {
1068 self.release_inner(self.to_release)
1069 }
1070}
1071
1072impl<'a, const N: usize> Deref for GrantW<'a, N> {
1073 type Target = [u8];
1074
1075 fn deref(&self) -> &Self::Target {
1076 self.buf
1077 }
1078}
1079
1080impl<'a, const N: usize> DerefMut for GrantW<'a, N> {
1081 fn deref_mut(&mut self) -> &mut [u8] {
1082 self.buf
1083 }
1084}
1085
1086impl<'a, const N: usize> Deref for GrantR<'a, N> {
1087 type Target = [u8];
1088
1089 fn deref(&self) -> &Self::Target {
1090 self.buf
1091 }
1092}
1093
1094impl<'a, const N: usize> DerefMut for GrantR<'a, N> {
1095 fn deref_mut(&mut self) -> &mut [u8] {
1096 self.buf
1097 }
1098}
1099
1100#[cfg(feature = "thumbv6")]
1101mod atomic {
1102 use core::sync::atomic::{
1103 AtomicBool, AtomicUsize,
1104 Ordering::{self, Acquire, Release},
1105 };
1106 use cortex_m::interrupt::free;
1107
1108 #[inline(always)]
1109 pub fn fetch_add(atomic: &AtomicUsize, val: usize, _order: Ordering) -> usize {
1110 free(|_| {
1111 let prev = atomic.load(Acquire);
1112 atomic.store(prev.wrapping_add(val), Release);
1113 prev
1114 })
1115 }
1116
1117 #[inline(always)]
1118 pub fn fetch_sub(atomic: &AtomicUsize, val: usize, _order: Ordering) -> usize {
1119 free(|_| {
1120 let prev = atomic.load(Acquire);
1121 atomic.store(prev.wrapping_sub(val), Release);
1122 prev
1123 })
1124 }
1125
1126 #[inline(always)]
1127 pub fn swap(atomic: &AtomicBool, val: bool, _order: Ordering) -> bool {
1128 free(|_| {
1129 let prev = atomic.load(Acquire);
1130 atomic.store(val, Release);
1131 prev
1132 })
1133 }
1134}
1135
1136#[cfg(not(feature = "thumbv6"))]
1137mod atomic {
1138 use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1139
1140 #[inline(always)]
1141 pub fn fetch_add(atomic: &AtomicUsize, val: usize, order: Ordering) -> usize {
1142 atomic.fetch_add(val, order)
1143 }
1144
1145 #[inline(always)]
1146 pub fn fetch_sub(atomic: &AtomicUsize, val: usize, order: Ordering) -> usize {
1147 atomic.fetch_sub(val, order)
1148 }
1149
1150 #[inline(always)]
1151 pub fn swap(atomic: &AtomicBool, val: bool, order: Ordering) -> bool {
1152 atomic.swap(val, order)
1153 }
1154}