use fst::{map::Stream, IntoStreamer, Map, MapBuilder, Streamer};
use memmap2::{Mmap, MmapOptions};
use std::fs::File;
use std::io::BufWriter;
use crate::del_vec::DelVec;
use crate::{DelOnDrop, StrMapConfig};
type Result<T> = std::result::Result<T, fst::Error>;
pub(crate) struct Builder<T> {
inner: BuilderType,
values: Vec<T>,
}
enum BuilderType {
Mem(MapBuilder<Vec<u8>>),
File(MapBuilder<BufWriter<File>>, DelOnDrop),
}
impl<T> Builder<T> {
pub(crate) fn new(opts: &StrMapConfig) -> Result<Self> {
match opts.new_location() {
None => {
let builder = MapBuilder::memory();
Ok(Builder {
inner: BuilderType::Mem(builder),
values: Vec::new(),
})
}
Some(path) => {
let del_on_drop;
let file = std::fs::File::create(&path)?;
del_on_drop = DelOnDrop(path);
let builder = MapBuilder::new(std::io::BufWriter::new(file))?;
Ok(Builder {
inner: BuilderType::File(builder, del_on_drop),
values: Vec::new(),
})
}
}
}
pub(crate) fn insert(&mut self, key: &[u8], val: T) -> Result<()> {
let id = self.values.len() as u64;
self.values.push(val);
match self.inner {
BuilderType::Mem(ref mut b) => b.insert(key, id),
BuilderType::File(ref mut b, _) => b.insert(key, id),
}
}
pub(crate) fn finish(self) -> Result<MapPiece<T>> {
match self.inner {
BuilderType::Mem(builder) => {
let vec = builder.into_inner()?;
let mut mmap = MmapOptions::new().len(vec.len()).map_anon()?;
mmap.copy_from_slice(&vec);
Ok(MapPiece {
values: DelVec::new(self.values),
map: Map::new(mmap.make_read_only()?)?,
_file: None,
})
}
BuilderType::File(builder, del_on_drop) => {
let file = builder
.into_inner()?
.into_inner()
.map_err(|err| err.into_error())?;
drop(file);
let file = std::fs::File::open(&del_on_drop.0)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
Ok(MapPiece {
values: DelVec::new(self.values),
map: Map::new(mmap)?,
_file: Some(del_on_drop),
})
}
}
}
}
pub(crate) struct MapPiece<T> {
values: DelVec<T>,
map: Map<Mmap>,
_file: Option<DelOnDrop>,
}
impl<T> MapPiece<T> {
pub(crate) fn build(keys: &[&[u8]], vals: Vec<T>, opts: &StrMapConfig) -> Result<Self> {
let mut ids = (0..keys.len()).collect::<Vec<usize>>();
ids.sort_by_key(|i| keys[*i]);
match opts.new_location() {
None => {
let mut builder = MapBuilder::memory();
for id in ids {
builder.insert(keys[id], id as u64)?;
}
let vec = builder.into_inner()?;
let mut mmap = MmapOptions::new().len(vec.len()).map_anon()?;
mmap.copy_from_slice(&vec);
Ok(MapPiece {
values: DelVec::new(vals),
map: Map::new(mmap.make_read_only()?)?,
_file: None,
})
}
Some(path) => {
let del_on_drop;
let file = std::fs::File::create(&path)?;
del_on_drop = DelOnDrop(path);
let mut builder = MapBuilder::new(std::io::BufWriter::new(file))?;
for id in ids {
builder.insert(keys[id], id as u64)?;
}
let file = builder
.into_inner()?
.into_inner()
.map_err(|err| err.into_error())?;
drop(file);
let file = std::fs::File::open(&del_on_drop.0)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
Ok(MapPiece {
values: DelVec::new(vals),
map: Map::new(mmap)?,
_file: Some(del_on_drop),
})
}
}
}
pub(crate) fn has_key(&self, key: &[u8]) -> bool {
self.get(key).is_some()
}
pub(crate) fn with_next<'a, F>(&'a self, curr: &[u8], func: F)
where
F: for<'b> FnOnce(&'b [u8], &'a T),
{
let mut range = self.map.range().gt(curr).into_stream();
while let Some((key, id)) = range.next() {
if let Some(value) = self.values.get(id as usize) {
func(key, value);
return;
}
}
}
pub(crate) fn get(&self, key: &[u8]) -> Option<&T> {
self.map
.get(key)
.and_then(|id| self.values.get(id as usize))
}
pub(crate) fn get_mut(&mut self, key: &[u8]) -> Option<&mut T> {
self.map
.get(key)
.and_then(|id| self.values.get_mut(id as usize))
}
pub(crate) fn delete(&mut self, key: &[u8]) -> bool {
if let Some(idx) = self.map.get(key) {
self.values.delete(idx as usize)
} else {
false
}
}
pub(crate) fn len(&self) -> usize {
self.values.len()
}
pub(crate) fn capacity(&self) -> usize {
self.values.capacity()
}
pub(crate) fn drain(&mut self) -> Drain<'_, T> {
Drain {
values: &mut self.values,
stream: self.map.stream(),
}
}
}
pub(crate) struct Drain<'a, T> {
values: &'a mut DelVec<T>,
stream: Stream<'a>,
}
impl<'a, T> Drain<'a, T> {
pub(crate) fn next(&mut self) -> Option<(&[u8], T)> {
loop {
match self.stream.next() {
Some((key, id)) => {
let id = id as usize;
if let Some(value) = self.values.take(id) {
let key = unsafe { &*(key as *const [u8]) };
return Some((key, value));
}
}
None => return None,
}
}
}
}
pub(crate) fn merge_pieces<T>(
mut a: MapPiece<T>,
mut b: MapPiece<T>,
opts: &StrMapConfig,
) -> Result<MapPiece<T>> {
let mut a_drain = a.drain();
let mut b_drain = b.drain();
let mut next_a = [].as_slice();
let mut next_b = [].as_slice();
let mut next = (None, None);
let mut builder = Builder::new(opts)?;
loop {
if next.0.is_none() {
next_a = [].as_slice();
if let Some((key, val)) = a_drain.next() {
next_a = key;
next.0 = Some(val);
}
}
if next.1.is_none() {
next_b = [].as_slice();
if let Some((key, val)) = b_drain.next() {
next_b = key;
next.1 = Some(val);
}
}
match next {
(Some(a), Some(b)) if next_a < next_b => {
builder.insert(next_a, a)?;
next = (None, Some(b));
}
(Some(a), Some(b)) => {
builder.insert(next_b, b)?;
next = (Some(a), None);
}
(Some(a), None) => {
builder.insert(next_a, a)?;
next = (None, None);
}
(None, Some(b)) => {
builder.insert(next_b, b)?;
next = (None, None);
}
(None, None) => break,
}
}
builder.finish()
}