use std::{
fs::{File, OpenOptions},
io, mem,
ops::{Deref, DerefMut},
os::fd::AsRawFd,
path::Path,
ptr, slice,
sync::atomic::Ordering,
};
use crate::{
stats::{COUNT_ACTIVE_SEGMENT, COUNT_FTRUNCATE_FAILED, COUNT_MMAP_FAILED, COUNT_MUNMAP_FAILED},
utils::{check_zst, page_size},
};
#[derive(Debug)]
pub struct Segment<T> {
pub(crate) addr: *mut T,
len: usize,
capacity: usize,
}
impl<T> Segment<T> {
#[inline(always)]
pub const fn null() -> Self {
check_zst::<T>();
Self {
addr: std::ptr::null_mut(),
len: 0,
capacity: 0,
}
}
pub fn open_rw<P: AsRef<Path>>(path: P, capacity: usize) -> io::Result<Self> {
check_zst::<T>();
if capacity == 0 {
return Ok(Self::null());
}
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)?;
unsafe { ftruncate::<T>(&file, capacity) }?;
let addr = unsafe { mmap(&file, capacity) }?;
Ok(Self {
addr,
len: 0,
capacity,
})
}
#[inline(always)]
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn truncate(&mut self, new_len: usize) {
if new_len > self.len {
return;
}
unsafe {
let remaining_len = self.len - new_len;
let items = ptr::slice_from_raw_parts_mut(self.addr.add(new_len), remaining_len);
self.set_len(new_len);
ptr::drop_in_place(items);
}
}
pub fn truncate_first(&mut self, delete_count: usize) {
let new_len = self.len.saturating_add_signed(-(delete_count as isize));
if new_len == 0 {
self.clear()
} else {
unsafe {
let items = slice::from_raw_parts_mut(self.addr, delete_count);
ptr::drop_in_place(items);
ptr::copy(self.addr.add(delete_count), self.addr, new_len);
self.set_len(new_len);
}
}
}
#[inline]
pub fn clear(&mut self) {
unsafe {
let items = slice::from_raw_parts_mut(self.addr, self.len);
self.set_len(0);
ptr::drop_in_place(items);
}
}
#[allow(clippy::missing_safety_doc)]
#[inline(always)]
pub unsafe fn set_len(&mut self, new_len: usize) {
debug_assert!(new_len <= self.capacity());
self.len = new_len;
}
#[inline(always)]
pub fn disk_size(&self) -> usize {
self.capacity * mem::size_of::<T>()
}
#[inline]
pub fn push_within_capacity(&mut self, value: T) -> Result<(), T> {
if self.len == self.capacity {
return Err(value);
}
unsafe {
let dst = self.addr.add(self.len);
ptr::write(dst, value);
}
self.len += 1;
Ok(())
}
#[inline]
pub fn pop(&mut self) -> Option<T> {
if self.len == 0 {
return None;
}
self.len -= 1;
unsafe {
let src = self.addr.add(self.len);
Some(ptr::read(src))
}
}
pub fn extend_from_segment(&mut self, mut other: Segment<T>) {
let new_len = other.len + self.len;
assert!(
new_len <= self.capacity,
"New segment is too small: new_len={}, capacity={}",
new_len,
self.capacity
);
unsafe {
ptr::copy_nonoverlapping(other.addr, self.addr.add(self.len), other.len);
self.set_len(new_len);
other.set_len(0);
};
}
pub fn advice_prefetch_all_pages(&self) {
if self.addr.is_null() || self.len == 0 {
return;
}
let madvise_code = unsafe {
libc::madvise(
self.addr.cast(),
self.len * mem::size_of::<T>(),
libc::MADV_WILLNEED,
)
};
assert_eq!(
madvise_code,
0,
"madvise error: {}",
io::Error::last_os_error()
);
}
pub fn advice_prefetch_page_at(&self, index: usize) {
if self.addr.is_null() || index >= self.len {
return;
}
let page_size = page_size();
let page_mask = !(page_size.wrapping_add_signed(-1));
let madvise_code = unsafe {
libc::madvise(
(self.addr.add(index) as usize & page_mask) as *mut libc::c_void,
page_size,
libc::MADV_WILLNEED,
)
};
assert_eq!(
madvise_code,
0,
"madvise error: {}",
io::Error::last_os_error()
);
}
}
impl<T> Deref for Segment<T> {
type Target = [T];
#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe { slice::from_raw_parts(self.addr, self.len) }
}
}
impl<T> DerefMut for Segment<T> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { slice::from_raw_parts_mut(self.addr, self.len) }
}
}
impl<T> Drop for Segment<T> {
fn drop(&mut self) {
if self.len > 0 {
unsafe { ptr::drop_in_place(ptr::slice_from_raw_parts_mut(self.addr, self.len)) }
}
if !self.addr.is_null() {
let _ = unsafe { munmap(self.addr, self.capacity) };
}
}
}
unsafe impl<T> Send for Segment<T> {}
unsafe impl<T> Sync for Segment<T> {}
unsafe fn ftruncate<T>(file: &File, capacity: usize) -> io::Result<()> {
check_zst::<T>();
let segment_size = capacity * mem::size_of::<T>();
let fd = file.as_raw_fd();
if libc::ftruncate(fd, segment_size as libc::off_t) != 0 {
COUNT_FTRUNCATE_FAILED.fetch_add(1, Ordering::Relaxed);
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
unsafe fn mmap<T>(file: &File, capacity: usize) -> io::Result<*mut T> {
check_zst::<T>();
let segment_size = capacity * mem::size_of::<T>();
let fd = file.as_raw_fd();
let addr = libc::mmap(
std::ptr::null_mut(),
segment_size as libc::size_t,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
fd,
0,
);
if addr == libc::MAP_FAILED {
COUNT_MMAP_FAILED.fetch_add(1, Ordering::Relaxed);
Err(io::Error::last_os_error())
} else {
COUNT_ACTIVE_SEGMENT.fetch_add(1, Ordering::Relaxed);
Ok(addr.cast())
}
}
unsafe fn munmap<T>(addr: *mut T, capacity: usize) -> io::Result<()> {
check_zst::<T>();
debug_assert!(!addr.is_null());
debug_assert!(capacity > 0);
let unmap_code = libc::munmap(addr.cast(), capacity * mem::size_of::<T>());
if unmap_code != 0 {
COUNT_MUNMAP_FAILED.fetch_add(1, Ordering::Relaxed);
Err(io::Error::last_os_error())
} else {
COUNT_ACTIVE_SEGMENT.fetch_sub(1, Ordering::Relaxed);
Ok(())
}
}