use std::{
fs::{self, OpenOptions},
io, mem,
ops::{Deref, DerefMut},
os::{fd::AsRawFd, unix::prelude::FileExt},
path::{Path, PathBuf},
ptr, slice,
sync::atomic::Ordering,
};
use crate::stats::{COUNT_ACTIVE_SEGMENT, COUNT_MMAP_FAILED, COUNT_MUNMAP_FAILED};
#[derive(Debug)]
pub struct Segment<T> {
addr: *mut T,
len: usize,
capacity: usize,
path: Option<PathBuf>,
}
impl<T> Segment<T> {
pub const fn null() -> Self {
Self {
addr: std::ptr::null_mut(),
len: 0,
capacity: 0,
path: None,
}
}
pub fn open_rw<P: AsRef<Path>>(path: P, capacity: usize) -> io::Result<Self> {
if capacity == 0 {
return Ok(Self::null());
}
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)?;
let segment_size = capacity * mem::size_of::<T>();
file.write_at(&[0], (segment_size - 1) as u64)?;
let fd = file.as_raw_fd();
let offset = 0;
let addr = unsafe {
libc::mmap(
std::ptr::null_mut(),
segment_size as libc::size_t,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
fd,
offset,
)
};
if addr == libc::MAP_FAILED {
COUNT_MMAP_FAILED.fetch_add(1, Ordering::Relaxed);
Err(io::Error::last_os_error())
} else {
COUNT_ACTIVE_SEGMENT.fetch_add(1, Ordering::Relaxed);
Ok(Self {
addr: addr.cast(),
len: 0,
capacity,
path: Some(path.as_ref().to_path_buf()),
})
}
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn truncate(&mut self, new_len: usize) {
if new_len > self.len {
return;
}
unsafe {
let remaining_len = self.len - new_len;
let items = ptr::slice_from_raw_parts_mut(self.addr.add(new_len), remaining_len);
self.set_len(new_len);
ptr::drop_in_place(items);
}
}
pub fn clear(&mut self) {
unsafe {
let items = slice::from_raw_parts_mut(self.addr, self.len);
self.set_len(0);
ptr::drop_in_place(items);
}
}
#[allow(clippy::missing_safety_doc)]
pub unsafe fn set_len(&mut self, new_len: usize) {
debug_assert!(new_len <= self.capacity());
self.len = new_len;
}
pub fn disk_size(&self) -> usize {
self.capacity * mem::size_of::<T>()
}
pub fn push_within_capacity(&mut self, value: T) -> Result<(), T> {
if self.len == self.capacity {
return Err(value);
}
unsafe {
let dst = self.addr.add(self.len);
ptr::write(dst, value);
}
self.len += 1;
Ok(())
}
pub fn pop(&mut self) -> Option<T> {
if self.len == 0 {
return None;
}
self.len -= 1;
unsafe {
let src = self.addr.add(self.len);
Some(ptr::read(src))
}
}
pub fn fill_from(&mut self, mut other: Segment<T>) {
assert!(self.len == 0, "New segment contains already some data");
assert!(
other.capacity < self.capacity,
"Copy segment size error (src: {}, dst: {})",
other.capacity,
self.capacity
);
unsafe {
ptr::copy(other.addr, self.addr, other.capacity);
self.set_len(other.len);
other.set_len(0);
};
}
}
impl<T> Deref for Segment<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
unsafe { slice::from_raw_parts(self.addr, self.len) }
}
}
impl<T> DerefMut for Segment<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { slice::from_raw_parts_mut(self.addr, self.len) }
}
}
impl<T> Drop for Segment<T> {
fn drop(&mut self) {
if self.len > 0 {
unsafe { ptr::drop_in_place(ptr::slice_from_raw_parts_mut(self.addr, self.len)) }
}
if self.capacity > 0 {
assert!(!self.addr.is_null());
let unmap_code =
unsafe { libc::munmap(self.addr.cast(), self.capacity * mem::size_of::<T>()) };
if unmap_code == 0 {
COUNT_ACTIVE_SEGMENT.fetch_sub(1, Ordering::Relaxed);
} else {
COUNT_MUNMAP_FAILED.fetch_add(1, Ordering::Relaxed);
}
}
if let Some(path) = &self.path {
let _ = fs::remove_file(path);
}
}
}
unsafe impl<T> Send for Segment<T> {}
unsafe impl<T> Sync for Segment<T> {}