fluke_buffet/
bufpool.rs

1use std::{
2    cell::{RefCell, RefMut},
3    collections::VecDeque,
4    marker::PhantomData,
5    ops::{self, Bound, RangeBounds},
6};
7
8use memmap2::MmapMut;
9
10pub type BufResult<T, B> = (std::io::Result<T>, B);
11
12pub const BUF_SIZE: u16 = 4096;
13
14#[cfg(not(feature = "miri"))]
15pub const NUM_BUF: u32 = 64 * 1024;
16
17#[cfg(feature = "miri")]
18pub const NUM_BUF: u32 = 64;
19
20thread_local! {
21    pub static BUF_POOL: BufPool = const { BufPool::new_empty(BUF_SIZE, NUM_BUF) };
22    static BUF_POOL_DESTRUCTOR: RefCell<Option<MmapMut>> = const { RefCell::new(None) };
23}
24
25type Result<T, E = Error> = std::result::Result<T, E>;
26
27#[derive(thiserror::Error, Debug)]
28pub enum Error {
29    #[error("could not mmap buffer")]
30    Mmap(#[from] std::io::Error),
31
32    #[error("out of memory")]
33    OutOfMemory,
34
35    #[error("slice does not fit into this RollMut")]
36    DoesNotFit,
37}
38
39/// A buffer pool
40pub(crate) struct BufPool {
41    buf_size: u16,
42    num_buf: u32,
43    inner: RefCell<Option<BufPoolInner>>,
44}
45
46struct BufPoolInner {
47    // this is tied to an [MmapMut] that gets deallocated at thread exit
48    // thanks to [BUF_POOL_DESTRUCTOR]
49    ptr: *mut u8,
50
51    // index of free blocks
52    free: VecDeque<u32>,
53
54    // ref counts start as all zeroes, get incremented when a block is borrowed
55    ref_counts: Vec<i16>,
56}
57
58impl BufPool {
59    pub(crate) const fn new_empty(buf_size: u16, num_buf: u32) -> BufPool {
60        BufPool {
61            buf_size,
62            num_buf,
63            inner: RefCell::new(None),
64        }
65    }
66
67    pub(crate) fn alloc(&self) -> Result<BufMut> {
68        let mut inner = self.borrow_mut()?;
69
70        if let Some(index) = inner.free.pop_front() {
71            inner.ref_counts[index as usize] += 1;
72            Ok(BufMut {
73                index,
74                off: 0,
75                len: self.buf_size as _,
76                _non_send: PhantomData,
77            })
78        } else {
79            Err(Error::OutOfMemory)
80        }
81    }
82
83    fn inc(&self, index: u32) {
84        let mut inner = self.inner.borrow_mut();
85        let inner = inner.as_mut().unwrap();
86
87        inner.ref_counts[index as usize] += 1;
88    }
89
90    fn dec(&self, index: u32) {
91        let mut inner = self.inner.borrow_mut();
92        let inner = inner.as_mut().unwrap();
93
94        inner.ref_counts[index as usize] -= 1;
95        if inner.ref_counts[index as usize] == 0 {
96            inner.free.push_back(index);
97        }
98    }
99
100    #[cfg(test)]
101    pub(crate) fn num_free(&self) -> Result<usize> {
102        Ok(self.borrow_mut()?.free.len())
103    }
104
105    fn borrow_mut(&self) -> Result<RefMut<BufPoolInner>> {
106        let mut inner = self.inner.borrow_mut();
107        if inner.is_none() {
108            let len = self.num_buf as usize * self.buf_size as usize;
109
110            let ptr: *mut u8;
111
112            #[cfg(feature = "miri")]
113            {
114                let mut map = vec![0; len];
115                ptr = map.as_mut_ptr();
116                std::mem::forget(map);
117            }
118
119            #[cfg(not(feature = "miri"))]
120            {
121                let mut map = memmap2::MmapOptions::new().len(len).map_anon()?;
122                ptr = map.as_mut_ptr();
123                BUF_POOL_DESTRUCTOR.with(|destructor| {
124                    *destructor.borrow_mut() = Some(map);
125                });
126            }
127
128            let mut free = VecDeque::with_capacity(self.num_buf as usize);
129            for i in 0..self.num_buf {
130                free.push_back(i);
131            }
132            let ref_counts = vec![0; self.num_buf as usize];
133
134            *inner = Some(BufPoolInner {
135                ptr,
136                free,
137                ref_counts,
138            });
139        }
140
141        let r = RefMut::map(inner, |o| o.as_mut().unwrap());
142        Ok(r)
143    }
144
145    /// Returns the base pointer for a block
146    ///
147    /// # Safety
148    ///
149    /// Borrow-checking is on you!
150    #[inline(always)]
151    unsafe fn base_ptr(&self, index: u32) -> *mut u8 {
152        let start = index as usize * self.buf_size as usize;
153        self.inner.borrow_mut().as_mut().unwrap().ptr.add(start)
154    }
155}
156
157/// A mutable buffer. Cannot be cloned, but can be written to
158pub struct BufMut {
159    pub(crate) index: u32,
160    pub(crate) off: u16,
161    pub(crate) len: u16,
162
163    // makes this type non-Send, which we do want
164    _non_send: PhantomData<*mut ()>,
165}
166
167impl BufMut {
168    #[inline(always)]
169    pub fn alloc() -> Result<BufMut, Error> {
170        BUF_POOL.with(|bp| bp.alloc())
171    }
172
173    #[inline(always)]
174    pub fn len(&self) -> usize {
175        self.len as _
176    }
177
178    #[inline(always)]
179    pub fn is_empty(&self) -> bool {
180        self.len == 0
181    }
182
183    /// Turn this buffer immutable. The reference count doesn't change, but the
184    /// immutable view can be cloned.
185    #[inline]
186    pub fn freeze(self) -> Buf {
187        let b = Buf {
188            index: self.index,
189            off: self.off,
190            len: self.len,
191
192            _non_send: PhantomData,
193        };
194
195        std::mem::forget(self); // don't decrease ref count
196
197        b
198    }
199
200    /// Dangerous: freeze a slice of this. Must only be used if you can
201    /// guarantee this portion won't be written to anymore.
202    pub(crate) fn freeze_slice(&self, range: impl RangeBounds<usize>) -> Buf {
203        let b = Buf {
204            index: self.index,
205            off: self.off,
206            len: self.len,
207
208            _non_send: PhantomData,
209        };
210
211        b.slice(range)
212    }
213
214    /// Split this buffer in twain. Both parts can be written to.  Panics if
215    /// `at` is out of bounds.
216    #[inline]
217    pub fn split_at(self, at: usize) -> (Self, Self) {
218        assert!(at <= self.len as usize);
219
220        let left = BufMut {
221            index: self.index,
222            off: self.off,
223            len: at as _,
224
225            _non_send: PhantomData,
226        };
227
228        let right = BufMut {
229            index: self.index,
230            off: self.off + at as u16,
231            len: (self.len - at as u16),
232
233            _non_send: PhantomData,
234        };
235
236        std::mem::forget(self); // don't decrease ref count
237        BUF_POOL.with(|bp| bp.inc(left.index)); // in fact, increase it by 1
238
239        (left, right)
240    }
241
242    /// Skip over the first `n` bytes, panics if out of bound
243    pub fn skip(&mut self, n: usize) {
244        assert!(n <= self.len as usize);
245
246        let u16_n: u16 = n.try_into().unwrap();
247        self.off += u16_n;
248        self.len -= u16_n;
249    }
250}
251
252impl ops::Deref for BufMut {
253    type Target = [u8];
254
255    #[inline(always)]
256    fn deref(&self) -> &[u8] {
257        unsafe {
258            std::slice::from_raw_parts(
259                BUF_POOL.with(|bp| bp.base_ptr(self.index).add(self.off as _)),
260                self.len as _,
261            )
262        }
263    }
264}
265
266impl ops::DerefMut for BufMut {
267    #[inline(always)]
268    fn deref_mut(&mut self) -> &mut Self::Target {
269        unsafe {
270            std::slice::from_raw_parts_mut(
271                BUF_POOL.with(|bp| bp.base_ptr(self.index).add(self.off as _)),
272                self.len as _,
273            )
274        }
275    }
276}
277
278mod iobufmut {
279    use crate::{ReadInto, RollMut};
280
281    use super::BufMut;
282    pub trait Sealed {}
283    impl Sealed for BufMut {}
284    impl Sealed for RollMut {}
285    impl Sealed for ReadInto {}
286    impl Sealed for Vec<u8> {}
287}
288
289/// The IoBufMut trait is implemented by buffer types that can be passed to
290/// io-uring operations.
291///
292/// # Safety
293///
294/// If the address returned by `io_buf_mut_stable_mut_ptr` is not actually stable
295/// and moves while an io_uring operation is in-flight, the kernel might write
296/// to the wrong memory location.
297pub unsafe trait IoBufMut: iobufmut::Sealed {
298    /// Gets a pointer to the start of the buffer
299    fn io_buf_mut_stable_mut_ptr(&mut self) -> *mut u8;
300
301    /// Gets the capacity of the buffer
302    fn io_buf_mut_capacity(&self) -> usize;
303
304    /// Gets a mutable slice of the buffer
305    ///
306    /// # Safety
307    ///
308    /// An arbitrary implementor may return invalid pointers or lengths.
309    unsafe fn slice_mut(&mut self) -> &mut [u8] {
310        std::slice::from_raw_parts_mut(self.io_buf_mut_stable_mut_ptr(), self.io_buf_mut_capacity())
311    }
312}
313
314unsafe impl IoBufMut for BufMut {
315    fn io_buf_mut_stable_mut_ptr(&mut self) -> *mut u8 {
316        unsafe { BUF_POOL.with(|bp| bp.base_ptr(self.index).add(self.off as _)) }
317    }
318
319    fn io_buf_mut_capacity(&self) -> usize {
320        self.len as usize
321    }
322}
323
324unsafe impl IoBufMut for Vec<u8> {
325    fn io_buf_mut_stable_mut_ptr(&mut self) -> *mut u8 {
326        self.as_mut_ptr()
327    }
328
329    fn io_buf_mut_capacity(&self) -> usize {
330        self.capacity()
331    }
332}
333
334impl Drop for BufMut {
335    fn drop(&mut self) {
336        BUF_POOL.with(|bp| bp.dec(self.index));
337    }
338}
339
340/// A read-only buffer. Can be cloned, but cannot be written to.
341pub struct Buf {
342    pub(crate) index: u32,
343    pub(crate) off: u16,
344    pub(crate) len: u16,
345
346    // makes this type non-Send, which we do want
347    _non_send: PhantomData<*mut ()>,
348}
349
350impl Buf {
351    #[inline(always)]
352    pub fn len(&self) -> usize {
353        self.len as _
354    }
355
356    #[inline(always)]
357    pub fn is_empty(&self) -> bool {
358        self.len == 0
359    }
360
361    /// Take an owned slice of this
362    pub fn slice(mut self, range: impl RangeBounds<usize>) -> Self {
363        let mut new_start = 0;
364        let mut new_end = self.len();
365
366        match range.start_bound() {
367            Bound::Included(&n) => new_start = n,
368            Bound::Excluded(&n) => new_start = n + 1,
369            Bound::Unbounded => {}
370        }
371
372        match range.end_bound() {
373            Bound::Included(&n) => new_end = n + 1,
374            Bound::Excluded(&n) => new_end = n,
375            Bound::Unbounded => {}
376        }
377
378        assert!(new_start <= new_end);
379        assert!(new_end <= self.len());
380
381        self.off += new_start as u16;
382        self.len = (new_end - new_start) as u16;
383        self
384    }
385
386    /// Split this buffer in twain.
387    /// Panics if `at` is out of bounds.
388    #[inline]
389    pub fn split_at(self, at: usize) -> (Self, Self) {
390        assert!(at <= self.len as usize);
391
392        let left = Buf {
393            index: self.index,
394            off: self.off,
395            len: at as _,
396
397            _non_send: PhantomData,
398        };
399
400        let right = Buf {
401            index: self.index,
402            off: self.off + at as u16,
403            len: (self.len - at as u16),
404
405            _non_send: PhantomData,
406        };
407
408        std::mem::forget(self); // don't decrease ref count
409        BUF_POOL.with(|bp| bp.inc(left.index)); // in fact, increase it by 1
410
411        (left, right)
412    }
413}
414
415impl ops::Deref for Buf {
416    type Target = [u8];
417
418    #[inline(always)]
419    fn deref(&self) -> &[u8] {
420        unsafe {
421            std::slice::from_raw_parts(
422                BUF_POOL.with(|bp| bp.base_ptr(self.index).add(self.off as _)),
423                self.len as _,
424            )
425        }
426    }
427}
428
429impl Clone for Buf {
430    fn clone(&self) -> Self {
431        BUF_POOL.with(|bp| bp.inc(self.index));
432        Self {
433            index: self.index,
434            off: self.off,
435            len: self.len,
436            _non_send: PhantomData,
437        }
438    }
439}
440
441impl Drop for Buf {
442    fn drop(&mut self) {
443        BUF_POOL.with(|bp| bp.dec(self.index));
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use crate::{Buf, BufMut, BUF_POOL};
450    use std::rc::Rc;
451
452    #[test]
453    fn size_test() {
454        assert_eq!(8, std::mem::size_of::<BufMut>());
455        assert_eq!(8, std::mem::size_of::<Buf>());
456        assert_eq!(16, std::mem::size_of::<Box<[u8]>>());
457
458        assert_eq!(16, std::mem::size_of::<&[u8]>());
459
460        #[allow(dead_code)]
461        enum BufOrBox {
462            Buf(Buf),
463            Box((Rc<Box<[u8]>>, u32, u32)),
464        }
465        assert_eq!(16, std::mem::size_of::<BufOrBox>());
466
467        #[allow(dead_code)]
468        enum Chunk {
469            Buf(Buf),
470            Box(Box<[u8]>),
471            Static(&'static [u8]),
472        }
473        assert_eq!(24, std::mem::size_of::<Chunk>());
474    }
475
476    #[test]
477    fn freeze_test() -> eyre::Result<()> {
478        let total_bufs = BUF_POOL.with(|bp| bp.num_free())?;
479        let mut bm = BufMut::alloc().unwrap();
480
481        assert_eq!(total_bufs - 1, BUF_POOL.with(|bp| bp.num_free())?);
482        assert_eq!(bm.len(), 4096);
483
484        bm[..11].copy_from_slice(b"hello world");
485        assert_eq!(&bm[..11], b"hello world");
486
487        let b = bm.freeze();
488        assert_eq!(&b[..11], b"hello world");
489        assert_eq!(total_bufs - 1, BUF_POOL.with(|bp| bp.num_free())?);
490
491        let b2 = b.clone();
492        assert_eq!(&b[..11], b"hello world");
493        assert_eq!(total_bufs - 1, BUF_POOL.with(|bp| bp.num_free())?);
494
495        drop(b);
496        assert_eq!(total_bufs - 1, BUF_POOL.with(|bp| bp.num_free())?);
497
498        drop(b2);
499        assert_eq!(total_bufs, BUF_POOL.with(|bp| bp.num_free())?);
500
501        Ok(())
502    }
503
504    #[test]
505    fn split_test() -> eyre::Result<()> {
506        let total_bufs = BUF_POOL.with(|bp| bp.num_free())?;
507        let mut bm = BufMut::alloc().unwrap();
508
509        bm[..12].copy_from_slice(b"yellowjacket");
510        let (a, b) = bm.split_at(6);
511
512        assert_eq!(total_bufs - 1, BUF_POOL.with(|bp| bp.num_free())?);
513        assert_eq!(&a[..], b"yellow");
514        assert_eq!(&b[..6], b"jacket");
515
516        drop((a, b));
517
518        Ok(())
519    }
520}