#![allow(clippy::partialeq_ne_impl)]
#![warn(missing_docs)]
#![deny(clippy::unwrap_used)]
use std::{
fs, io, mem,
ops::{Deref, DerefMut},
path::PathBuf,
};
#[cfg(feature = "serde")]
use std::marker::PhantomData;
pub use segment::Segment;
pub use segment_builder::{DefaultSegmentBuilder, SegmentBuilder};
pub use stats::MmapStats;
use utils::check_zst;
pub use vec_builder::MmapVecBuilder;
#[cfg(feature = "serde")]
use serde::{
de::{SeqAccess, Visitor},
ser::SerializeSeq,
Deserialize, Deserializer, Serialize, Serializer,
};
use crate::utils::page_size;
mod segment;
mod segment_builder;
mod stats;
mod utils;
mod vec_builder;
#[derive(Debug)]
pub struct MmapVec<T, B: SegmentBuilder = DefaultSegmentBuilder> {
pub(crate) segment: Segment<T>,
pub(crate) builder: B,
pub(crate) path: PathBuf,
}
impl<T, B> MmapVec<T, B>
where
B: SegmentBuilder,
{
#[inline(always)]
pub fn new() -> Self {
check_zst::<T>();
let builder = B::default();
let path = builder.new_segment_path();
Self {
segment: Segment::null(),
builder,
path,
}
}
#[inline(always)]
pub fn with_capacity(capacity: usize) -> io::Result<Self> {
MmapVecBuilder::new().capacity(capacity).try_build()
}
#[inline(always)]
pub fn capacity(&self) -> usize {
self.segment.capacity()
}
#[inline(always)]
pub fn disk_size(&self) -> usize {
self.segment.disk_size()
}
#[inline(always)]
pub fn truncate(&mut self, new_len: usize) {
self.segment.truncate(new_len);
}
#[inline(always)]
pub fn truncate_first(&mut self, delete_count: usize) {
self.segment.truncate_first(delete_count);
}
#[inline(always)]
pub fn clear(&mut self) {
self.segment.clear();
}
#[inline(always)]
pub fn pop(&mut self) -> Option<T> {
self.segment.pop()
}
pub fn push(&mut self, value: T) -> Result<(), io::Error> {
if self.capacity() == self.len() {
let min_capacity = page_size() / mem::size_of::<T>();
self.reserve(std::cmp::max(self.len(), min_capacity))?;
}
assert!(
self.push_within_capacity(value).is_ok(),
"Fail to push to newly created segment"
);
Ok(())
}
#[inline(always)]
pub fn push_within_capacity(&mut self, value: T) -> Result<(), T> {
self.segment.push_within_capacity(value)
}
pub fn reserve(&mut self, additional: usize) -> Result<(), io::Error> {
let current_len = self.len();
let mut new_capacity = current_len + additional;
if self.capacity() < new_capacity {
let page_size = page_size();
let page_capacity = page_size / mem::size_of::<T>();
if new_capacity % page_capacity != 0 {
new_capacity += page_capacity - (new_capacity % page_capacity);
}
assert!(new_capacity > self.segment.capacity());
let new_segment = Segment::<T>::open_rw(&self.path, new_capacity)?;
debug_assert!(new_segment.capacity() > self.segment.capacity());
let mut old_segment = mem::replace(&mut self.segment, new_segment);
assert_ne!(old_segment.addr, self.segment.addr);
unsafe {
old_segment.set_len(0);
self.segment.set_len(current_len);
}
}
Ok(())
}
#[inline(always)]
pub fn advice_prefetch_all_pages(&self) {
self.segment.advice_prefetch_all_pages()
}
#[inline(always)]
pub fn advice_prefetch_page_at(&self, index: usize) {
self.segment.advice_prefetch_page_at(index)
}
pub fn path(&self) -> PathBuf {
self.path.clone()
}
}
impl<T, B> MmapVec<T, B>
where
B: SegmentBuilder + Clone,
T: Clone,
{
pub fn try_clone(&self) -> io::Result<Self> {
if self.len() == 0 {
return Ok(Self::default());
}
let other_path = self.builder.new_segment_path();
let mut other_segment = Segment::open_rw(&other_path, self.capacity())?;
for row in &self[..] {
assert!(
other_segment.push_within_capacity(row.clone()).is_ok(),
"Fail to push to newly cloned segment"
);
}
Ok(Self {
builder: self.builder.clone(),
segment: other_segment,
path: other_path,
})
}
}
impl<T, B> Default for MmapVec<T, B>
where
B: SegmentBuilder,
{
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl<T, B> Deref for MmapVec<T, B>
where
B: SegmentBuilder,
{
type Target = [T];
#[inline(always)]
fn deref(&self) -> &Self::Target {
self.segment.deref()
}
}
impl<T, B> DerefMut for MmapVec<T, B>
where
B: SegmentBuilder,
{
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
self.segment.deref_mut()
}
}
impl<T, U, B1, B2> PartialEq<MmapVec<U, B2>> for MmapVec<T, B1>
where
B1: SegmentBuilder,
B2: SegmentBuilder,
T: PartialEq<U>,
{
#[inline(always)]
fn eq(&self, other: &MmapVec<U, B2>) -> bool {
self[..] == other[..]
}
#[inline(always)]
fn ne(&self, other: &MmapVec<U, B2>) -> bool {
self[..] != other[..]
}
}
impl<T, B> Eq for MmapVec<T, B>
where
B: SegmentBuilder,
T: Eq,
{
}
impl<T, B> Drop for MmapVec<T, B>
where
B: SegmentBuilder,
{
fn drop(&mut self) {
let _ = fs::remove_file(&self.path);
}
}
#[inline(never)]
#[cold]
fn panic_bad_capacity() {
panic!("MmapVec was build with bad capacity");
}
impl<T, B, const N: usize> TryFrom<[T; N]> for MmapVec<T, B>
where
B: SegmentBuilder,
{
type Error = io::Error;
fn try_from(values: [T; N]) -> Result<Self, Self::Error> {
let mut out = Self::with_capacity(N)?;
for val in values {
if out.push_within_capacity(val).is_err() {
panic_bad_capacity();
}
}
Ok(out)
}
}
impl<T, B> TryFrom<&[T]> for MmapVec<T, B>
where
T: Clone,
B: SegmentBuilder,
{
type Error = io::Error;
fn try_from(values: &[T]) -> Result<Self, Self::Error> {
let mut out = Self::with_capacity(values.len())?;
for val in values {
if out.push_within_capacity(val.clone()).is_err() {
panic_bad_capacity();
}
}
Ok(out)
}
}
impl<T, B> TryFrom<Vec<T>> for MmapVec<T, B>
where
B: SegmentBuilder,
{
type Error = io::Error;
fn try_from(values: Vec<T>) -> Result<Self, Self::Error> {
let mut out = Self::with_capacity(values.len())?;
for val in values {
if out.push_within_capacity(val).is_err() {
panic_bad_capacity();
}
}
Ok(out)
}
}
#[cfg(feature = "serde")]
impl<T, B> Serialize for MmapVec<T, B>
where
T: Serialize,
B: SegmentBuilder,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.len()))?;
for element in self.iter() {
seq.serialize_element(element)?;
}
seq.end()
}
}
#[cfg(feature = "serde")]
struct MmapVecVisitor<T, B: SegmentBuilder> {
_marker: PhantomData<fn() -> MmapVec<T, B>>,
}
#[cfg(feature = "serde")]
impl<T, B: SegmentBuilder> MmapVecVisitor<T, B> {
fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
#[cfg(feature = "serde")]
impl<'de, T, B> Visitor<'de> for MmapVecVisitor<T, B>
where
T: Deserialize<'de>,
B: SegmentBuilder,
{
type Value = MmapVec<T, B>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("expected sequence of element")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
use serde::de::Error;
let capacity = seq.size_hint().unwrap_or(0);
let mut output = MmapVec::<T, B>::with_capacity(capacity).map_err(Error::custom)?;
while let Some(element) = seq.next_element()? {
output.push(element).map_err(Error::custom)?;
}
Ok(output)
}
}
#[cfg(feature = "serde")]
impl<'de, T, B> Deserialize<'de> for MmapVec<T, B>
where
T: Deserialize<'de>,
B: SegmentBuilder,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_seq(MmapVecVisitor::new())
}
}