rust_mqtt/
buffer.rs

1//! Contains the trait the client uses to store slices of memory and basic implementations.
2
3use crate::bytes::Bytes;
4
5#[cfg(feature = "alloc")]
6pub use alloc::AllocBuffer;
7#[cfg(feature = "bump")]
8pub use bump::BumpBuffer;
9
10/// A trait to describe anything that can allocate memory.
11///
12/// Returned memory can be borrowed or owned. Either way, it is bound by the `'a`
13/// lifetime - usually just the lifetime of the underlying buffer.
14///
15/// The client does not store any references to memory returned by this provider.
16pub trait BufferProvider<'a> {
17    /// The type returned from a successful buffer provision.
18    /// Must implement `AsMut<[u8]>` so that it can be borrowed mutably right after allocation for initialization
19    /// and `Into<Bytes<'a>>` for storing.
20    type Buffer: AsMut<[u8]> + Into<Bytes<'a>>;
21
22    /// The error type returned from a failed buffer provision.
23    type ProvisionError: core::fmt::Debug;
24
25    /// If successful, returns contiguous memory with a size in bytes of the `len` argument.
26    fn provide_buffer(&mut self, len: usize) -> Result<Self::Buffer, Self::ProvisionError>;
27}
28
29#[cfg(feature = "bump")]
30mod bump {
31    use core::slice;
32
33    use crate::buffer::BufferProvider;
34
35    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
36    #[cfg_attr(feature = "defmt", derive(defmt::Format))]
37    pub struct InsufficientSpace;
38
39    /// Allocates memory from an underlying buffer by bumping up a pointer by the requested length.
40    ///
41    /// Can be resetted when no references to buffer contents exist.
42    #[derive(Debug)]
43    pub struct BumpBuffer<'a> {
44        slice: &'a mut [u8],
45        index: usize,
46    }
47
48    impl<'a> BufferProvider<'a> for BumpBuffer<'a> {
49        type Buffer = &'a mut [u8];
50        type ProvisionError = InsufficientSpace;
51
52        /// Return the next `len` bytes from the buffer, advancing the internal
53        /// pointer. Returns `InsufficientSpace` if there isn't enough room.
54        fn provide_buffer(&mut self, len: usize) -> Result<Self::Buffer, Self::ProvisionError> {
55            if self.remaining_len() < len {
56                Err(InsufficientSpace)
57            } else {
58                let start = self.index;
59                // Safety: we checked bounds above, and the pointer originates from
60                // the backing slice owned by this struct with the same lifetime.
61                let ptr = unsafe { self.slice.as_mut_ptr().add(start) };
62                // Advance index after computing start so callers don't observe a
63                // partially-advanced state if we ever change ordering.
64                self.index += len;
65
66                // Safety: the slice starts at the self.index which is not part of any other reservations.
67                // self.index has been skipped ahead the slice's full length
68                let slice = unsafe { slice::from_raw_parts_mut(ptr, len) };
69
70                Ok(slice)
71            }
72        }
73    }
74
75    impl<'a> BumpBuffer<'a> {
76        /// Creates a new `BumpBuffer` with the provided slice as underlying buffer.
77        pub fn new(slice: &'a mut [u8]) -> Self {
78            Self { slice, index: 0 }
79        }
80
81        /// Returns the remaining amount of unallocated bytes in the underlying buffer.
82        #[inline]
83        pub fn remaining_len(&self) -> usize {
84            self.slice.len() - self.index
85        }
86
87        /// Invalidates all previous allocations by resetting the `BumpBuffer`'s index,
88        /// allowing the underlying buffer to be reallocated.
89        ///
90        /// # Safety
91        /// No more references exist to previously allocated slices / underlying buffer content.
92        /// In the context of the client, this is the case when no server publication content
93        /// (topic & message) and no reason strings still held.
94        #[inline]
95        pub unsafe fn reset(&mut self) {
96            self.index = 0;
97        }
98    }
99
100    #[cfg(test)]
101    mod unit {
102        use tokio_test::{assert_err, assert_ok};
103
104        use super::*;
105
106        #[test]
107        fn provide_buffer_and_remaining_len() {
108            let mut backing = [0; 10];
109
110            {
111                let mut buf = BumpBuffer::new(&mut backing);
112
113                assert_eq!(buf.remaining_len(), 10);
114
115                let s1 = assert_ok!(buf.provide_buffer(4));
116                assert_eq!(s1.len(), 4);
117
118                s1.copy_from_slice(&[1, 2, 3, 4]);
119                assert_eq!(buf.remaining_len(), 6);
120
121                // take remaining 6 bytes
122                let s2 = assert_ok!(buf.provide_buffer(6));
123                assert_eq!(s2.len(), 6);
124
125                s2.copy_from_slice(&[5, 6, 7, 8, 9, 10]);
126                assert_eq!(buf.remaining_len(), 0);
127
128                assert_eq!(s1, [1, 2, 3, 4]);
129                assert_eq!(s2, [5, 6, 7, 8, 9, 10]);
130
131                let err = assert_err!(buf.provide_buffer(1));
132                assert_eq!(err, InsufficientSpace);
133            }
134
135            assert_eq!(backing, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
136        }
137
138        #[test]
139        fn reset_allows_reuse() {
140            let mut backing = [0; 6];
141
142            {
143                let mut buf = BumpBuffer::new(&mut backing);
144
145                let s1 = assert_ok!(buf.provide_buffer(3));
146                s1.copy_from_slice(&[11, 12, 13]);
147
148                // reset and take again from start
149                unsafe { buf.reset() }
150                let s2 = assert_ok!(buf.provide_buffer(3));
151                assert_eq!(s1, s2);
152            }
153
154            assert_eq!(backing, [11, 12, 13, 0, 0, 0]);
155        }
156    }
157}
158
159#[cfg(feature = "alloc")]
160mod alloc {
161    use core::convert::Infallible;
162
163    use alloc::boxed::Box;
164    use alloc::vec;
165
166    use crate::buffer::BufferProvider;
167
168    /// Allocates memory using the global allocator.
169    #[derive(Debug)]
170    pub struct AllocBuffer;
171
172    impl<'a> BufferProvider<'a> for AllocBuffer {
173        type Buffer = Box<[u8]>;
174        type ProvisionError = Infallible;
175
176        /// Allocates `len` bytes on the heap
177        fn provide_buffer(&mut self, len: usize) -> Result<Self::Buffer, Self::ProvisionError> {
178            let buffer = vec![0; len].into_boxed_slice();
179
180            Ok(buffer)
181        }
182    }
183
184    #[cfg(test)]
185    mod unit {
186        use crate::buffer::{BufferProvider, alloc::AllocBuffer};
187        use tokio_test::assert_ok;
188
189        #[test]
190        fn provide_buffer() {
191            let mut alloc = AllocBuffer;
192
193            let buffer = alloc.provide_buffer(10);
194            let buffer = assert_ok!(buffer);
195            assert_eq!(10, buffer.len());
196        }
197    }
198}