use std::alloc::{self, Layout};
use crate::sync::{Arc, Mutex};
use super::stats::{BufferKind, SizePoolStats};
#[derive(Debug, Clone)]
pub struct Page {
inner: Arc<PageInner>,
}
#[derive(Debug)]
struct PageInner {
bytes: *mut u8,
last_buffer: *mut u8,
reserved_bitmask: Mutex<u16>,
layout: Layout,
stats: Arc<SizePoolStats>,
}
unsafe impl Send for PageInner {}
unsafe impl Sync for PageInner {}
impl Drop for PageInner {
fn drop(&mut self) {
unsafe {
alloc::dealloc(self.bytes, self.layout);
}
}
}
impl Page {
const BUFFERS_PER_PAGE: usize = u16::BITS as usize;
pub fn new(stats: Arc<SizePoolStats>) -> Self {
assert_ne!(stats.buffer_size, 0);
let layout = Layout::array::<[u8; Self::BUFFERS_PER_PAGE]>(stats.buffer_size).unwrap();
let bytes = unsafe { alloc::alloc(layout) };
if bytes.is_null() {
alloc::handle_alloc_error(layout);
}
metrics::gauge!("pool.allocated_pages", "size" => format!("{}", stats.buffer_size)).increment(1.0);
stats.add_empty_page();
let last_buffer = unsafe { bytes.add((Self::BUFFERS_PER_PAGE - 1) * stats.buffer_size) };
let inner = PageInner {
bytes,
last_buffer,
reserved_bitmask: Default::default(),
layout,
stats,
};
Page { inner: Arc::new(inner) }
}
pub fn try_reserve(&self, kind: BufferKind) -> Option<PagedBufferPtr> {
let (index, page_status) = consume(&mut self.inner.reserved_bitmask.lock().unwrap())?;
let offset = index * self.inner.stats.buffer_size;
if let PageStatus::Empty = page_status {
self.inner.stats.remove_empty_page();
};
self.inner.stats.add_reserved_buffer(kind);
let ptr = unsafe { self.inner.bytes.add(offset) };
Some(PagedBufferPtr {
ptr,
pool_page: self.clone(),
kind,
})
}
fn release(&self, ptr: *mut u8, kind: BufferKind) {
assert!(
ptr >= self.inner.bytes && ptr <= self.inner.last_buffer,
"the pointer does not belong to this page"
);
let offset = unsafe { ptr.offset_from(self.inner.bytes) };
let index = offset as usize / self.inner.stats.buffer_size;
let mask = !(1u16 << index);
let mut bitmask = self.inner.reserved_bitmask.lock().unwrap();
*bitmask &= mask;
self.inner.stats.remove_reserved_buffer(kind);
if *bitmask == 0 {
self.inner.stats.add_empty_page();
}
}
pub fn invalidate_if_empty(&self) -> bool {
let mut bitmask = self.inner.reserved_bitmask.lock().unwrap();
if *bitmask != 0 {
return false;
}
self.inner.stats.remove_empty_page();
*bitmask = FULL_MASK;
true
}
#[cfg(test)]
fn is_empty(&self) -> bool {
let bitmask = self.inner.reserved_bitmask.lock().unwrap();
*bitmask == 0
}
#[cfg(test)]
pub(super) fn new_for_tests(buffer_size: usize) -> Page {
let pool_stats = super::stats::PoolStats::default();
let stats = SizePoolStats::new(buffer_size, Arc::new(pool_stats));
Page::new(Arc::new(stats))
}
}
const FULL_MASK: u16 = u16::MAX;
fn consume(bitmask: &mut u16) -> Option<(usize, PageStatus)> {
if *bitmask != FULL_MASK {
let page_status = if *bitmask == 0 {
PageStatus::Empty
} else {
PageStatus::NonEmpty
};
for index in 0usize..16 {
let mask = 1u16 << index;
if *bitmask & mask == 0 {
*bitmask |= mask;
return Some((index, page_status));
}
}
}
None
}
#[derive(Debug)]
enum PageStatus {
Empty,
NonEmpty,
}
#[derive(Debug)]
pub(super) struct PagedBufferPtr {
ptr: *mut u8,
pool_page: Page,
kind: BufferKind,
}
unsafe impl Send for PagedBufferPtr {}
impl Drop for PagedBufferPtr {
fn drop(&mut self) {
self.pool_page.release(self.ptr, self.kind);
}
}
impl PagedBufferPtr {
pub fn as_raw_ptr(&self) -> *mut u8 {
self.ptr
}
pub fn size(&self) -> usize {
self.pool_page.inner.stats.buffer_size
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_reserve_buffers() {
let page = Page::new_for_tests(1024);
let mut buffers = Vec::new();
for _ in 0..Page::BUFFERS_PER_PAGE {
let buffer_ptr = page
.try_reserve(BufferKind::Other)
.expect("reserving up to 16 buffers from a new page should succeed");
buffers.push(buffer_ptr);
}
assert!(
page.try_reserve(BufferKind::Other).is_none(),
"reserving the 17th buffer from a page should return None"
);
_ = buffers.pop().expect("drop one of the reserved buffers");
buffers.push(
page.try_reserve(BufferKind::Other)
.expect("reserving after dropping 1 buffer should succeed"),
);
assert!(
page.try_reserve(BufferKind::Other).is_none(),
"reserving when all 16 buffers are in use should return None"
);
buffers.clear();
assert!(page.is_empty());
}
#[test]
fn test_invalidate_new() {
let page = Page::new_for_tests(1024);
assert!(
page.invalidate_if_empty(),
"invalidation of a new, empty page should succeed"
);
assert!(
page.try_reserve(BufferKind::Other).is_none(),
"reserving from an invalidated page should return None"
);
}
#[test]
fn test_invalidate_in_use() {
let page = Page::new_for_tests(1024);
let buffer_ptr = page
.try_reserve(BufferKind::Other)
.expect("reserving from a new page should succeed");
assert!(!page.invalidate_if_empty(), "invalidation of a page in use should fail");
drop(buffer_ptr);
assert!(
page.invalidate_if_empty(),
"invalidation of an empty page should succeed"
);
assert!(
page.try_reserve(BufferKind::Other).is_none(),
"reserving from an invalidated page should return None"
);
}
}