pub mod mapper;
use std::{borrow::Cow, io::{BufWriter, Read, Seek, Write}, ops::Range};
use crate::errors::Error;
use mapper::Mapper;
pub type Section<'l> = (Range<u64>, Cow<'l, [u8]>);
#[derive(Debug)]
pub struct Layer<'l, Stream: Write + Read + Seek> {
pub bounds: Option<Range<u64>>,
mapper: Mapper<'l>,
pub size: u64,
pub read_cursor: (u64, usize),
stream: Stream,
}
#[inline]
fn get_u64(buffer: &[u8], range: Range<usize>) -> Result<u64, Error> {
Ok(u64::from_be_bytes(
if let Some(Ok(x)) = buffer.get(range).map(|x| x.try_into())
{ x }
else {
return Err(Error::DBCorrupt(Box::new(Error::InvalidLayer)));
}
))
}
#[inline]
fn until_err<T, E>(err: &mut &mut Result<(), E>, item: Result<T, E>) -> Option<T> {
match item {
Ok(item) => Some(item),
Err(e) => {
**err = Err(e);
None
}
}
}
impl<'l, Stream: Write + Read + Seek> Layer<'l, Stream> {
#[inline]
pub fn new(stream: Stream) -> Self {
Self {
bounds: None,
mapper: Mapper::new(),
size: 0,
read_cursor: (0, 0),
stream,
}
}
#[inline]
pub fn load(mut stream: Stream) -> Result<Self, Error> {
let mut buffer = [0u8; (u64::BITS as usize/8) * 3]; match stream.read_exact(&mut buffer) {
Ok(_) => (),
Err(_) => return Err(Error::DBCorrupt(Box::new(Error::InvalidLayer))),
};
let size = get_u64(&buffer, 0..8)?;
let bounds = get_u64(&buffer, 8..16)?..get_u64(&buffer, 16..24)?;
Ok(Self {
bounds: Some(bounds),
mapper: Mapper::Disk,
size,
read_cursor: (0, 0),
stream,
})
}
#[inline]
pub fn check_collisions(&mut self, range: &Range<u64>) -> Result<Box<[Range<u64>]>, Error> {
match self.bounds.as_ref() {
Some(bounds) => if bounds.end < range.start || range.end < bounds.start { return Ok(Box::new([])) },
None => return Ok(Box::new([])),
}
let mut err = Ok(());
let out = self.mapper.iter(&mut self.stream, self.size, REWIND_IDX)?
.scan(&mut err, until_err) .filter(|(r, _)| range.start < r.end && r.start < range.end)
.map(|(r, _)| range.start.max(r.start)..std::cmp::min(range.end, r.end))
.collect();
err?;
Ok(out)
}
#[inline]
pub fn check_non_collisions(&self, range: &Range<u64>, collisions: &[Range<u64>]) -> Box<[Range<u64>]> { let mut output = Vec::new();
let mut last_end = range.start;
for r in collisions.iter() {
if r.start > last_end {
output.push(last_end..r.start);
} last_end = r.end;
}
if last_end != range.end {
output.push(last_end..range.end);
} output.into_boxed_slice()
}
#[inline]
pub fn read_unchecked(&mut self, addr: &Range<u64>) -> Result<(Range<usize>, Cow<[u8]>), Error> {
let mut err = Ok(());
let out = self.mapper.iter(&mut self.stream, self.size, REWIND_IDX)? .scan(&mut err, until_err) .find(|(r, _)| r.start <= addr.start && addr.end <= r.end) .map(|(r, x)| ((addr.start-r.start) as usize..(addr.end-r.start) as usize, x));
err?;
out
.map(Ok)
.unwrap_or(Err(Error::OutOfBounds))
}
#[inline]
pub fn write_unchecked(&mut self, idx: u64, data: Cow<'l, [u8]>) -> Result<(), Error> {
let (mapper, write_cursor) = self.mapper.get_writer()?;
let range = idx..idx+data.len() as u64;
let map_idx = if write_cursor.0 == idx {
write_cursor.1
} else {
mapper
.iter()
.enumerate()
.find(|(_, (r, _))| r.start > idx)
.map(|(i, _)| i)
.unwrap_or(0) };
mapper.insert(map_idx, (range.clone(), data));
*write_cursor = (range.end, map_idx+1);
self.size += range.end - range.start;
self.bounds = Some(match self.bounds {
Some(ref x) => std::cmp::min(x.start, range.start)..std::cmp::max(x.end, range.end),
None => range,
});
Ok(())
}
pub fn flush(&mut self) -> Result<(), Error> {
const BUFFER_SIZE: usize = 1024 * 1024 * 4; let (bounds, mapper) = if let (Some(b), Mapper::Heap { mapper, .. }) = (&self.bounds, &self.mapper) { (b, mapper) } else { return Ok(()) };
let mut file = BufWriter::with_capacity(BUFFER_SIZE, &mut self.stream);
file.rewind()?;
file.write_all(&self.size.to_be_bytes())?;
file.write_all(&bounds.start.to_be_bytes())?;
file.write_all(&bounds.end.to_be_bytes())?;
for (range, data) in mapper {
file.write_all(&range.start.to_be_bytes())?;
file.write_all(&range.end.to_be_bytes())?;
file.write_all(data)?;
}
file.flush()?;
self.mapper = Mapper::Disk;
Ok(())
}
}
pub const REWIND_IDX: u64 = 8 + 8 + 8;