use std::{cmp, fmt, fs, io, mem};
use std::borrow::Cow;
use std::collections::HashSet;
use std::hash::Hasher;
use std::marker::PhantomData;
use std::num::{NonZeroU64, NonZeroUsize};
use std::ops::{DerefMut, Range};
use std::path::Path;
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use bytes::Bytes;
use siphasher::sip::SipHasher24;
use crate::utils::sync::{Mutex, MutexGuard};
const DEFAULT_BUCKET_COUNT: usize = 1024;
const PAGE_SIZE: usize = 256;
#[derive(Debug)]
pub struct Archive<Meta> {
file: Storage,
meta: ArchiveMeta,
marker: PhantomData<Meta>,
}
impl<Meta> Archive<Meta> {
pub fn create(path: impl AsRef<Path>) -> Result<Self, ArchiveError> {
Self::create_with_file(
fs::OpenOptions::new()
.read(true).write(true).create_new(true)
.open(path)?
)
}
pub fn create_with_file(
mut file: fs::File
) -> Result<Self, ArchiveError> {
file.set_len(0)?;
let meta = ArchiveMeta::new(DEFAULT_BUCKET_COUNT);
file.write_all(&FILE_MAGIC)?;
meta.write(&mut file)?;
let len = file.stream_position()? + Self::index_size(&meta);
file.set_len(len)?;
Ok(Self {
file: Storage::new(file, true)?,
meta,
marker: PhantomData,
})
}
pub fn open(
path: impl AsRef<Path>, writable: bool
) -> Result<Self, OpenError> {
let mut file =
fs::OpenOptions::new().read(true).write(writable).open(path)?;
let mut magic = [0; MAGIC_SIZE];
file.read_exact(&mut magic)?;
if magic != FILE_MAGIC {
return Err(ArchiveError::Corrupt("invalid magic").into())
}
let meta = ArchiveMeta::read(&mut file)?;
Ok(Self {
file: Storage::new(file, writable)?,
meta,
marker: PhantomData,
})
}
pub fn verify(&self) -> Result<ArchiveStats, ArchiveError>
where Meta: ObjectMeta {
let mut objects = Vec::new();
let mut stats = ArchiveStats::default();
for idx in 0.. usize_to_u64(self.meta.bucket_count) {
let mut start = self.get_index(idx)?;
while let Some(pos) = start {
let (header, name) = ObjectHeader::read_with_name(
&self.file, pos.into()
)?;
if self.meta.hash_name(&name) != idx {
return Err(ArchiveError::Corrupt("incorrect hash"))
}
objects.push((u64::from(pos), header.size));
stats.object_count += 1;
stats.object_size += header.size;
stats.padding_size += header.size.saturating_sub(
Self::min_object_size(header.name_len, header.data_len)
);
start = header.next;
}
}
let mut start = self.get_empty_index()?;
while let Some(pos) = start {
let header = ObjectHeader::read(&self.file, pos.into())?;
objects.push((u64::from(pos), header.size));
stats.empty_count += 1;
stats.empty_size += header.size;
if stats.empty_min == 0 {
stats.empty_min = header.size
}
else {
stats.empty_min = cmp::min(stats.empty_min, header.size);
}
stats.empty_max = cmp::max(stats.empty_max, header.size);
start = header.next;
}
objects.sort_by(|left, right| left.0.cmp(&right.0));
for window in objects.windows(2) {
if window[1].0 != window[0].0 + window[0].1 {
return Err(ArchiveError::Corrupt("broken sequence"))
}
}
Ok(stats)
}
pub fn objects(&self) -> Result<ObjectsIter<'_, Meta>, ArchiveError> {
ObjectsIter::new(self)
}
}
impl<Meta: ObjectMeta> Archive<Meta> {
pub fn fetch(
&self,
name: &[u8],
) -> Result<Cow<'_, [u8]>, FetchError> {
let hash = self.meta.hash_name(name);
let found = match self.find(hash, name)? {
Some(found) => found,
None => return Err(FetchError::NotFound),
};
self.file.read(found.data_start::<Meta>(), |read| {
Ok(read.read_slice(found.header.data_len)?)
})
}
pub fn fetch_bytes(
&self,
name: &[u8],
) -> Result<Bytes, FetchError> {
self.fetch(name).map(|res| {
match res {
Cow::Borrowed(slice) => Bytes::copy_from_slice(slice),
Cow::Owned(vec) => vec.into()
}
})
}
pub fn fetch_if(
&self,
name: &[u8],
check: impl FnOnce(&Meta) -> Result<(), Meta::ConsistencyError>,
) -> Result<Cow<'_, [u8]>, AccessError<Meta::ConsistencyError>> {
let hash = self.meta.hash_name(name);
let found = match self.find(hash, name)? {
Some(found) => found,
None => return Err(AccessError::NotFound),
};
self.file.read(found.meta_start(), |read| {
check(
&Meta::read(read)?
).map_err(AccessError::Inconsistent)?;
Ok(read.read_slice(found.header.data_len)?)
})
}
pub fn publish(
&mut self, name: &[u8], meta: &Meta, data: &[u8]
) -> Result<(), PublishError> {
let hash = self.meta.hash_name(name);
if self.find(hash, name)?.is_some() {
return Err(PublishError::AlreadyExists)
}
self.publish_not_found(hash, name, meta, data)?;
Ok(())
}
fn publish_not_found(
&mut self,
hash: u64, name: &[u8], meta: &Meta, data: &[u8]
) -> Result<(), ArchiveError> {
match self.find_empty(name, data)? {
Some((empty, pos)) => {
self.publish_replace(hash, name, meta, data, empty, pos)?
}
None => {
self.publish_append(hash, name, meta, data)?
}
}
Ok(())
}
fn publish_replace(
&mut self,
hash: u64, name: &[u8], meta: &Meta, data: &[u8],
mut empty: ObjectHeader, start: NonZeroU64,
) -> Result<(), ArchiveError> {
self.unlink_empty(start.into(), empty.next)?;
let empty_end = u64::from(start) + empty.size;
let head = ObjectHeader::new(
Self::page_object_size(name, data),
self.get_index(hash)?,
name, data
);
self.write_object(start.into(), head, name, meta, data)?;
let object_end = u64::from(start) + head.size;
self.set_index(hash, start.into())?;
if empty_end > object_end {
empty.size = empty_end - object_end;
assert!(empty.size >= ObjectHeader::SIZE);
empty.next = self.get_empty_index()?;
empty.write(&mut self.file, object_end)?;
self.set_empty_index(NonZeroU64::new(object_end))?;
}
Ok(())
}
fn publish_append(
&mut self, hash: u64, name: &[u8], meta: &Meta, data: &[u8]
) -> Result<(), ArchiveError> {
let start = self.file.size;
let head = ObjectHeader::new(
Self::page_object_size(name, data),
self.get_index(hash)?,
name, data,
);
self.write_object(start, head, name, meta, data)?;
self.set_index(hash, NonZeroU64::new(start))?;
Ok(())
}
pub fn update(
&mut self,
name: &[u8], meta: &Meta, data: &[u8],
check: impl FnOnce(&Meta) -> Result<(), Meta::ConsistencyError>,
) -> Result<(), AccessError<Meta::ConsistencyError>> {
let hash = self.meta.hash_name(name);
let mut found = match self.find(hash, name)? {
Some(found) => found,
None => return Err(AccessError::NotFound),
};
check(
&self.file.read(found.meta_start(), |read| Meta::read(read))?
).map_err(AccessError::Inconsistent)?;
let new_size = Self::page_object_size(name, data);
if found.header.size == new_size {
found.header.data_len = data.len();
self.write_object(found.start, found.header, name, meta, data)?;
}
else {
self.delete_found(hash, found)?;
self.publish_not_found(hash, name, meta, data)?;
}
Ok(())
}
pub fn delete(
&mut self,
name: &[u8],
check: impl FnOnce(&Meta) -> Result<(), Meta::ConsistencyError>,
) -> Result<(), AccessError<Meta::ConsistencyError>> {
let hash = self.meta.hash_name(name);
let found = match self.find(hash, name)? {
Some(found) => found,
None => return Err(AccessError::NotFound),
};
check(
&self.file.read(found.meta_start(), |read| Meta::read(read))?
).map_err(AccessError::Inconsistent)?;
Ok(self.delete_found(hash, found)?)
}
fn delete_found(
&mut self, hash: u64, found: FoundObject
) -> Result<(), ArchiveError> {
match found.prev {
Some(pos) => {
ObjectHeader::update_next(
pos.into(), found.header.next, &mut self.file)?
}
None => self.set_index(hash, found.header.next)?,
}
self.create_empty(found.start, found.header.size)?;
Ok(())
}
fn create_empty(
&mut self, start: u64, mut size: u64
) -> Result<(), ArchiveError> {
let next_start = start.saturating_add(size);
if next_start == self.file.size {
self.file.set_len(start)?;
}
else {
let header = ObjectHeader::read(&self.file, next_start)?;
if header.is_empty {
self.unlink_empty(next_start, header.next)?;
size += header.size;
}
ObjectHeader::new_empty(size, self.get_empty_index()?).write(
&mut self.file, start
)?;
self.set_empty_index(NonZeroU64::new(start))?;
}
Ok(())
}
fn unlink_empty(
&mut self, start: u64, next: Option<NonZeroU64>
) -> Result<(), ArchiveError> {
let mut curr = self.get_empty_index()?;
let start = NonZeroU64::new(start);
if curr == start {
self.set_empty_index(next)?;
return Ok(())
}
while let Some(pos) = curr {
let header = ObjectHeader::read(&self.file, pos.into())?;
if header.next == start {
ObjectHeader::update_next(pos.into(), next, &mut self.file)?;
return Ok(())
}
curr = header.next;
}
Err(ArchiveError::Corrupt("empty object not in empty chain"))
}
fn find(
&self, hash: u64, name: &[u8]
) -> Result<Option<FoundObject>, ArchiveError> {
let mut start = self.get_index(hash)?;
let mut prev = None;
while let Some(pos) = start {
let (header, object_name) = ObjectHeader::read_with_name(
&self.file, pos.into()
)?;
if name == object_name.as_ref() {
return Ok(Some(FoundObject {
start: pos.into(),
header,
prev,
}))
}
prev = Some(pos);
start = header.next;
}
Ok(None)
}
fn find_empty(
&self, name: &[u8], data: &[u8]
) -> Result<Option<(ObjectHeader, NonZeroU64)>, ArchiveError> {
let mut start = self.get_empty_index()?;
if start.is_none() {
return Ok(None)
}
let size = Self::page_object_size(name, data);
let mut candidates = Vec::new();
while let Some(pos) = start {
let header = ObjectHeader::read(&self.file, pos.into())?;
start = header.next;
if Self::fits(header.size, size) {
candidates.push((header, pos));
}
}
if candidates.is_empty() {
return Ok(None)
}
candidates.sort_by(|left, right| left.0.size.cmp(&right.0.size));
Ok(candidates.first().copied())
}
fn write_object(
&mut self, start: u64,
head: ObjectHeader, name: &[u8], meta: &Meta, data: &[u8]
) -> Result<(), ArchiveError> {
self.file.write(start, |write| {
head.write_into(write)?;
write.write(name)?;
meta.write(write)?;
write.write(data)?;
let padding = usize::try_from(
head.size.checked_sub(
Self::min_object_size(name.len(), data.len())
).expect("paged size smaller than minimal size")
).expect("padding larger than page size");
if padding > 0 {
static PAGE: [u8; PAGE_SIZE] = [0u8; PAGE_SIZE];
write.write(&PAGE[..padding])?;
}
Ok(())
})
}
fn min_object_size(name_len: usize, data_len: usize) -> u64 {
ObjectHeader::SIZE
+ usize_to_u64(name_len)
+ usize_to_u64(Meta::SIZE)
+ usize_to_u64(data_len)
}
fn page_object_size(name: &[u8], data: &[u8]) -> u64 {
Self::min_object_size(
name.len(), data.len()
).next_multiple_of(usize_to_u64(PAGE_SIZE))
}
fn fits(empty_size: u64, object_size: u64) -> bool {
empty_size == object_size
|| empty_size >= object_size + ObjectHeader::SIZE
}
}
impl<Meta> Archive<Meta> {
const BUCKET_SIZE: usize = mem::size_of::<u64>();
fn index_size(meta: &ArchiveMeta) -> u64 {
usize_to_u64(
(meta.bucket_count + 1) * Self::BUCKET_SIZE
)
}
fn index_pos(&self, hash: u64) -> u64 {
usize_to_u64(MAGIC_SIZE) + ArchiveMeta::size()
+ hash * usize_to_u64(Self::BUCKET_SIZE)
}
fn empty_index_pos(&self) -> u64 {
usize_to_u64(MAGIC_SIZE) + ArchiveMeta::size()
+ usize_to_u64(self.meta.bucket_count * Self::BUCKET_SIZE)
}
fn get_index(
&self, hash: u64
) -> Result<Option<NonZeroU64>, ArchiveError> {
Ok(NonZeroU64::new(
self.file.read(self.index_pos(hash), |read| read.read_u64())?
))
}
fn get_empty_index(&self) -> Result<Option<NonZeroU64>, ArchiveError> {
Ok(NonZeroU64::new(
self.file.read(self.empty_index_pos(),|read| read.read_u64())?
))
}
fn set_index(
&mut self, hash: u64, pos: Option<NonZeroU64>,
) -> Result<(), ArchiveError> {
self.file.write(self.index_pos(hash), |write| {
write.write_u64(pos.map(Into::into).unwrap_or(0))
})
}
fn set_empty_index(
&mut self, pos: Option<NonZeroU64>
) -> Result<(), ArchiveError> {
self.file.write(self.empty_index_pos(), |write| {
write.write_u64(pos.map(Into::into).unwrap_or(0))
})
}
}
#[derive(Debug)]
pub struct AppendArchive<Meta> {
file: BufWriter<fs::File>,
index: AppendIndex,
names: HashSet<Box<[u8]>>,
meta: ArchiveMeta,
marker: PhantomData<Meta>,
}
impl<Meta: ObjectMeta> AppendArchive<Meta> {
pub fn create(path: impl AsRef<Path>) -> Result<Self, ArchiveError> {
Self::create_with_file(
fs::OpenOptions::new()
.read(true).write(true).create_new(true)
.open(path)?
)
}
pub fn create_with_file(
mut file: fs::File
) -> Result<Self, ArchiveError> {
let index = AppendIndex::new();
file.set_len(0)?;
let meta = ArchiveMeta::new(index.bucket_count());
file.write_all(&FILE_MAGIC)?;
meta.write(&mut file)?;
let len = file.stream_position()? + index.index_size();
file.set_len(len)?;
file.seek(SeekFrom::End(0))?;
Ok(Self {
file: BufWriter::new(file),
index,
names: Default::default(),
meta,
marker: PhantomData,
})
}
pub fn publish(
&mut self, name: &[u8], meta: &Meta, data: &[u8]
) -> Result<(), PublishError> {
if self.names.contains(name) {
return Err(PublishError::AlreadyExists)
}
let hash = self.meta.hash_name(name);
let start = self.file.stream_position()?;
let head = ObjectHeader::new(
Archive::<Meta>::page_object_size(name, data),
self.index.get(hash),
name, data,
);
self.write_object(head, name, meta, data)?;
self.index.set(hash, NonZeroU64::new(start));
self.names.insert(name.into());
Ok(())
}
fn write_object(
&mut self, head: ObjectHeader, name: &[u8], meta: &Meta, data: &[u8]
) -> Result<(), ArchiveError> {
let mut write = StorageWrite::new_raw_append(&mut self.file);
head.write_into(&mut write)?;
write.write(name)?;
meta.write(&mut write)?;
write.write(data)?;
let padding = usize::try_from(
head.size.checked_sub(
Archive::<Meta>::min_object_size(name.len(), data.len())
).expect("paged size smaller than minimal size")
).expect("padding larger than page size");
if padding > 0 {
static PAGE: [u8; PAGE_SIZE] = [0u8; PAGE_SIZE];
write.write(&PAGE[..padding])?;
}
Ok(())
}
pub fn finalize(&mut self) -> Result<(), ArchiveError> {
self.file.seek(SeekFrom::Start(
usize_to_u64(MAGIC_SIZE) + ArchiveMeta::size()
))?;
self.index.write(&mut self.file)?;
self.file.flush()?;
Ok(())
}
}
pub struct ObjectsIter<'a, Meta> {
archive: &'a Archive<Meta>,
buckets: Range<u64>,
next: Option<NonZeroU64>,
}
impl<'a, Meta> ObjectsIter<'a, Meta> {
fn new(archive: &'a Archive<Meta>) -> Result<Self, ArchiveError> {
Ok(Self {
archive,
buckets: 1..usize_to_u64(archive.meta.bucket_count),
next: archive.get_index(0)?,
})
}
}
impl<'a, Meta: ObjectMeta> ObjectsIter<'a, Meta> {
#[allow(clippy::type_complexity)]
fn transposed_next(
&mut self
) -> Result<Option<(Cow<'a, [u8]>, Meta, Cow<'a, [u8]>)>, ArchiveError> {
loop {
if let Some(pos) = self.next {
let (next, res) = self.archive.file.read(pos.into(), |read| {
let header = ObjectHeader::read_from(read)?;
let name = read.read_slice(header.name_len)?;
let meta = Meta::read(read)?;
let data = read.read_slice(header.data_len)?;
Ok::<_, ArchiveError>((header.next, (name, meta, data)))
})?;
self.next = next;
return Ok(Some(res))
}
let idx = match self.buckets.next() {
Some(idx) => idx,
None => return Ok(None)
};
self.next = self.archive.get_index(idx)?;
}
}
}
impl<'a, Meta: ObjectMeta> Iterator for ObjectsIter<'a, Meta> {
type Item = Result<(Cow<'a, [u8]>, Meta, Cow<'a, [u8]>), ArchiveError>;
fn next(&mut self) -> Option<Self::Item> {
self.transposed_next().transpose()
}
}
pub trait ObjectMeta: Sized {
const SIZE: usize;
type ConsistencyError: fmt::Debug;
fn write(&self, write: &mut StorageWrite) -> Result<(), ArchiveError>;
fn read(read: &mut StorageRead) -> Result<Self, ArchiveError>;
}
#[derive(Default, Debug)]
struct ArchiveMeta {
hash_key: [u8; 16],
bucket_count: usize,
}
impl ArchiveMeta {
fn new(bucket_count: usize) -> Self {
ArchiveMeta {
hash_key: rand::random(),
bucket_count,
}
}
const fn size() -> u64 {
usize_to_u64(
mem::size_of::<[u8; 16]>() + mem::size_of::<usize>()
)
}
fn write(&self, target: &mut impl io::Write) -> Result<(), io::Error> {
target.write_all(&self.hash_key)?;
target.write_all(&self.bucket_count.to_ne_bytes())?;
Ok(())
}
fn read(source: &mut impl io::Read) -> Result<Self, io::Error> {
let mut res = Self::default();
source.read_exact(&mut res.hash_key)?;
let mut buf = [0u8; mem::size_of::<usize>()];
source.read_exact(&mut buf)?;
res.bucket_count = usize::from_ne_bytes(buf);
Ok(res)
}
fn hash_name(&self, name: &[u8]) -> u64 {
let mut hasher = SipHasher24::new_with_key(&self.hash_key);
hasher.write(name);
hasher.finish() % usize_to_u64(self.bucket_count)
}
}
#[derive(Clone, Copy, Debug)]
struct ObjectHeader {
size: u64,
next: Option<NonZeroU64>,
is_empty: bool,
name_len: usize,
data_len: usize,
}
impl ObjectHeader {
fn new(
size: u64, next: Option<NonZeroU64>, name: &[u8], data: &[u8]
) -> Self {
ObjectHeader {
size,
next,
is_empty: false,
name_len: name.len(),
data_len: data.len()
}
}
fn new_empty(size: u64, next: Option<NonZeroU64>) -> Self {
ObjectHeader {
size, next,
is_empty: true,
name_len: 0,
data_len: 0,
}
}
fn read_from(read: &mut StorageRead) -> Result<Self, ArchiveError> {
Ok(Self {
size: read.read_u64()?,
next: NonZeroU64::new(read.read_u64()?),
is_empty: read.read_bool()?,
name_len: read.read_usize()?,
data_len: read.read_usize()?,
})
}
fn read(
storage: &Storage, start: u64
) -> Result<Self, ArchiveError> {
storage.read(start, Self::read_from)
}
fn read_with_name(
storage: &Storage, start: u64
) -> Result<(Self, Cow<'_, [u8]>), ArchiveError> {
storage.read(start, |read| {
let header = Self::read_from(read)?;
let name = read.read_slice(header.name_len)?;
Ok((header, name))
})
}
fn write_into(
&self, write: &mut StorageWrite
) -> Result<(), ArchiveError> {
write.write_u64(self.size)?;
write.write_nonzero_u64(self.next)?;
write.write_bool(self.is_empty)?;
write.write_usize(self.name_len)?;
write.write_usize(self.data_len)?;
Ok(())
}
fn write(
&self, storage: &mut Storage, start: u64
) -> Result<(), ArchiveError> {
storage.write(start, |write| self.write_into(write))
}
fn update_next(
start: u64, new_next: Option<NonZeroU64>, storage: &mut Storage
) -> Result<(), ArchiveError> {
storage.write(
start + usize_to_u64(mem::size_of::<u64>()),
|write| write.write_nonzero_u64(new_next),
)
}
const SIZE: u64 = usize_to_u64(
mem::size_of::<u64>()
+ mem::size_of::<u64>()
+ mem::size_of::<u8>()
+ mem::size_of::<usize>()
+ mem::size_of::<usize>()
);
fn meta_start(&self, start: u64) -> u64 {
start + Self::SIZE + usize_to_u64(self.name_len)
}
fn data_start<Meta: ObjectMeta>(&self, start: u64) -> u64 {
start + Self::SIZE
+ usize_to_u64(Meta::SIZE)
+ usize_to_u64(self.name_len)
}
}
struct FoundObject {
start: u64,
header: ObjectHeader,
prev: Option<NonZeroU64>,
}
impl FoundObject {
fn meta_start(&self) -> u64 {
self.header.meta_start(self.start)
}
fn data_start<Meta: ObjectMeta>(&self) -> u64 {
self.header.data_start::<Meta>(self.start)
}
}
#[derive(Debug)]
struct AppendIndex {
index: [Option<NonZeroU64>; DEFAULT_BUCKET_COUNT],
}
impl AppendIndex {
fn new() -> Self {
Self {
index: [None; DEFAULT_BUCKET_COUNT],
}
}
fn bucket_count(&self) -> usize {
DEFAULT_BUCKET_COUNT
}
fn index_size(&self) -> u64 {
usize_to_u64((DEFAULT_BUCKET_COUNT + 1) * mem::size_of::<u64>())
}
fn get(&self, hash: u64) -> Option<NonZeroU64> {
self.index[hash as usize % DEFAULT_BUCKET_COUNT]
}
fn set(&mut self, hash: u64, value: Option<NonZeroU64>) {
self.index[hash as usize % DEFAULT_BUCKET_COUNT] = value;
}
fn write(&self, file: &mut impl io::Write) -> Result<(), io::Error> {
for &item in self.index.as_slice() {
file.write_all(
&item.map(u64::from).unwrap_or(0).to_ne_bytes()
)?;
}
Ok(())
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct ArchiveStats {
pub object_count: u64,
pub object_size: u64,
pub padding_size: u64,
pub empty_count: u64,
pub empty_size: u64,
pub empty_min: u64,
pub empty_max: u64,
}
impl ArchiveStats {
pub fn print(self) {
println!(" object count: {}", self.object_count);
if self.object_count > 0 {
println!(" object size, sum: {}", self.object_size);
println!(" object size, avg: {}",
self.object_size / self.object_count
);
println!(" object padding, sum: {}", self.padding_size);
println!(" object padding, avg: {}",
self.padding_size / self.object_count
);
}
println!(" empty block count: {}", self.empty_count);
if self.empty_count > 0 {
println!(" empty size, sum: {}", self.empty_size);
println!(" empty size, min: {}", self.empty_min);
println!(" empty size, max: {}", self.empty_max);
println!(
" empty size, avg: {}", self.empty_size / self.empty_count
);
}
}
}
#[cfg(all(target_endian = "little", target_pointer_width = "16"))]
const SYSTEM: u8 = b'A';
#[cfg(all(target_endian = "little", target_pointer_width = "32"))]
const SYSTEM: u8 = b'B';
#[cfg(all(target_endian = "little", target_pointer_width = "64"))]
const SYSTEM: u8 = b'C';
#[cfg(all(target_endian = "big", target_pointer_width = "16"))]
const SYSTEM: u8 = b'D';
#[cfg(all(target_endian = "big", target_pointer_width = "32"))]
const SYSTEM: u8 = b'E';
#[cfg(all(target_endian = "big", target_pointer_width = "64"))]
const SYSTEM: u8 = b'F';
const VERSION: u8 = 1;
const MAGIC_SIZE: usize = 6;
const FILE_MAGIC: [u8; MAGIC_SIZE] = [
b'R', b'T', b'N', b'R', VERSION, SYSTEM,
];
#[derive(Debug)]
struct Storage {
file: Mutex<fs::File>,
#[cfg(unix)]
mmap: Option<mmapimpl::Mmap>,
#[cfg(unix)]
writable: bool,
size: u64,
}
impl Storage {
#[allow(unused_variables)]
pub fn new(file: fs::File, writable: bool) -> Result<Self, io::Error> {
let mut res = Self {
file: Mutex::new(file),
#[cfg(unix)]
mmap: None,
#[cfg(unix)]
writable,
size: 0,
};
res.mmap()?;
Ok(res)
}
fn mmap(&mut self) -> Result<(), io::Error> {
#[cfg(unix)]
{
self.mmap = mmapimpl::Mmap::new(
&mut self.file.lock(), self.writable
)?;
if let Some(mmap) = self.mmap.as_ref() {
self.size = mmap.size();
return Ok(())
}
}
let mut file = self.file.lock();
file.seek(SeekFrom::End(0))?;
self.size = file.stream_position()?;
Ok(())
}
pub fn read<'s, T, E: From<ArchiveError>>(
&'s self,
start: u64,
op: impl FnOnce(&mut StorageRead<'s>) -> Result<T, E>
) -> Result<T, E> {
op(&mut StorageRead::new(self, start)?)
}
pub fn write<T>(
&mut self,
start: u64,
op: impl FnOnce(&mut StorageWrite) -> Result<T, ArchiveError>
) -> Result<T, ArchiveError> {
let mut write = if self.size == start {
StorageWrite::new_append(self)?
}
else {
StorageWrite::new(self, start)?
};
let res = op(&mut write)?;
if write.finish()? {
self.mmap()?;
}
Ok(res)
}
pub fn set_len(&mut self, len: u64) -> Result<(), ArchiveError> {
self.file.lock().set_len(len)?;
self.mmap()?;
Ok(())
}
}
#[derive(Debug)]
pub struct StorageRead<'a>(ReadInner<'a>);
#[derive(Debug)]
enum ReadInner<'a> {
#[cfg(unix)]
Mmap {
mmap: &'a mmapimpl::Mmap,
pos: u64,
},
File {
file: MutexGuard<'a, fs::File>,
}
}
impl<'a> StorageRead<'a> {
fn new(storage: &'a Storage, start: u64) -> Result<Self, ArchiveError> {
if start > storage.size {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF"
).into())
}
#[cfg(unix)]
if let Some(mmap) = storage.mmap.as_ref() {
return Ok(StorageRead(
ReadInner::Mmap { mmap, pos: start }
))
}
let mut file = storage.file.lock();
file.seek(SeekFrom::Start(start))?;
Ok(StorageRead(
ReadInner::File { file }
))
}
pub fn pos(&mut self) -> Result<u64, ArchiveError> {
match self.0 {
#[cfg(unix)]
ReadInner::Mmap { pos, .. } => Ok(pos),
ReadInner::File { ref mut file } => Ok(file.stream_position()?),
}
}
pub fn read_into(
&mut self, buf: &mut [u8]
) -> Result<(), ArchiveError> {
match self.0 {
#[cfg(unix)]
ReadInner::Mmap { mmap, ref mut pos } => {
*pos = mmap.read_into(*pos, buf)?;
Ok(())
}
ReadInner::File { ref mut file } => {
Ok(file.read_exact(buf)?)
}
}
}
pub fn read_slice(
&mut self, len: usize,
) -> Result<Cow<'a, [u8]>, ArchiveError> {
match self.0 {
#[cfg(unix)]
ReadInner::Mmap { mmap, ref mut pos } => {
let (res, end) = mmap.read(*pos, len)?;
*pos = end;
Ok(res)
}
ReadInner::File { ref mut file } => {
let mut buf = Vec::with_capacity(len);
file.deref_mut().take(
u64::try_from(len).map_err(|_| {
io::Error::other("excessively large length")
})?
).read_to_end(&mut buf)?;
Ok(buf.into())
}
}
}
pub fn read_array<const N: usize>(
&mut self
) -> Result<[u8; N], ArchiveError> {
let mut res = [0; N];
self.read_into(&mut res)?;
Ok(res)
}
pub fn read_usize(&mut self) -> Result<usize, ArchiveError> {
Ok(usize::from_ne_bytes(self.read_array()?))
}
pub fn read_u64(&mut self) -> Result<u64, ArchiveError> {
Ok(u64::from_ne_bytes(self.read_array()?))
}
pub fn read_bool(&mut self) -> Result<bool, ArchiveError> {
match self.read_array::<1>()? {
[0] => Ok(false),
[1] => Ok(true),
_ => Err(ArchiveError::Corrupt("illegal value in bool")),
}
}
}
#[derive(Debug)]
pub struct StorageWrite<'a>(WriteInner<'a>);
#[derive(Debug)]
enum WriteInner<'a> {
#[cfg(unix)]
Mmap {
mmap: &'a mut mmapimpl::Mmap,
pos: u64,
},
Overwrite {
file: MutexGuard<'a, fs::File>,
},
Append {
file: MutexGuard<'a, fs::File>,
},
RawAppend {
file: &'a mut BufWriter<fs::File>,
}
}
impl<'a> StorageWrite<'a> {
fn new(
storage: &'a mut Storage, pos: u64
) -> Result<Self, ArchiveError> {
if pos >= storage.size {
return Err(ArchiveError::Corrupt("update writer past end of file"))
}
#[cfg(unix)]
match storage.mmap.as_mut() {
Some(mmap) => {
Ok(Self(WriteInner::Mmap { mmap, pos, }))
}
None => {
let mut file = storage.file.lock();
file.seek(SeekFrom::Start(pos))?;
Ok(Self(WriteInner::Overwrite { file }))
}
}
#[cfg(not(unix))]
{
let mut file = storage.file.lock();
file.seek(SeekFrom::Start(pos))?;
Ok(Self(WriteInner::Overwrite { file }))
}
}
fn new_append(storage: &'a mut Storage) -> Result<Self, ArchiveError> {
#[cfg(unix)]
if let Some(mmap) = storage.mmap.take() {
drop(mmap)
}
let mut file = storage.file.lock();
file.seek(SeekFrom::End(0))?;
Ok(Self(WriteInner::Append { file }))
}
fn new_raw_append(file: &'a mut BufWriter<fs::File>) -> Self {
Self(WriteInner::RawAppend { file })
}
fn finish(self) -> Result<bool, ArchiveError> {
match self.0 {
#[cfg(unix)]
WriteInner::Mmap { mmap, .. } => {
mmap.sync()?;
Ok(false)
}
WriteInner::Overwrite { mut file } => {
file.flush()?;
Ok(false)
}
WriteInner::Append { mut file } => {
file.flush()?;
Ok(true)
}
WriteInner::RawAppend { file } => {
file.flush()?;
Ok(true)
}
}
}
pub fn pos(&mut self) -> Result<u64, io::Error> {
match self.0 {
#[cfg(unix)]
WriteInner::Mmap { pos, .. } => Ok(pos),
WriteInner::Overwrite { ref mut file } => file.stream_position(),
WriteInner::Append { ref mut file } => file.stream_position(),
WriteInner::RawAppend { ref mut file } => file.stream_position(),
}
}
pub fn write(
&mut self, data: &[u8]
) -> Result<(), ArchiveError> {
match self.0 {
#[cfg(unix)]
WriteInner::Mmap { ref mut mmap, ref mut pos } => {
*pos = mmap.write(*pos, data)?;
Ok(())
}
WriteInner::Overwrite { ref mut file } => {
Ok(file.write_all(data)?)
}
WriteInner::Append { ref mut file, .. } => {
Ok(file.write_all(data)?)
}
WriteInner::RawAppend { ref mut file, .. } => {
Ok(file.write_all(data)?)
}
}
}
pub fn write_usize(&mut self, value: usize) -> Result<(), ArchiveError> {
self.write(&value.to_ne_bytes())
}
pub fn write_u64(&mut self, value: u64) -> Result<(), ArchiveError> {
self.write(&value.to_ne_bytes())
}
pub fn write_nonzero_usize(
&mut self, value: Option<NonZeroUsize>
) -> Result<(), ArchiveError> {
self.write(&value.map(Into::into).unwrap_or(0).to_ne_bytes())
}
pub fn write_nonzero_u64(
&mut self, value: Option<NonZeroU64>
) -> Result<(), ArchiveError> {
self.write(&value.map(Into::into).unwrap_or(0).to_ne_bytes())
}
pub fn write_bool(
&mut self, value: bool,
) -> Result<(), ArchiveError> {
self.write(
if value {
b"\x01"
}
else {
b"\x00"
}
)
}
}
#[cfg(unix)]
mod mmapimpl {
use std::{fs, io, slice};
use std::borrow::Cow;
use std::ffi::c_void;
use std::io::{Seek, SeekFrom};
use std::ptr::NonNull;
use nix::sys::mman::{MapFlags, MsFlags, ProtFlags, mmap, msync, munmap};
#[derive(Debug)]
pub struct Mmap {
ptr: NonNull<c_void>,
len: usize,
}
impl Mmap {
pub fn new(
file: &mut fs::File,
writable: bool,
) -> Result<Option<Self>, io::Error> {
file.seek(SeekFrom::End(0))?;
let size = file.stream_position()?;
file.rewind()?;
let size = match usize::try_from(size).and_then(TryInto::try_into) {
Ok(size) => size,
Err(_) => return Ok(None)
};
let ptr = unsafe {
mmap(
None, size,
if writable {
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE
}
else {
ProtFlags::PROT_READ
},
MapFlags::MAP_SHARED,
file,
0
)?
};
Ok(Some(Mmap { ptr, len: size.into() }))
}
pub fn size(&self) -> u64 {
super::usize_to_u64(self.len)
}
}
impl Drop for Mmap {
fn drop(&mut self) {
unsafe {
let _ = munmap(self.ptr, self.len); }
}
}
impl Mmap {
fn as_slice(&self) -> &[u8] {
unsafe {
slice::from_raw_parts(
self.ptr.as_ptr() as *const u8, self.len
)
}
}
fn as_slice_mut(&mut self) -> &mut [u8] {
unsafe {
slice::from_raw_parts_mut(
self.ptr.as_ptr() as *mut u8, self.len
)
}
}
}
impl Mmap {
pub fn read_into(
&self, start: u64, buf: &mut [u8]
) -> Result<u64, io::Error> {
let (slice, end) = self.read(start, buf.len())?;
buf.copy_from_slice(slice.as_ref());
Ok(end)
}
pub fn read(
&self, start: u64, len: usize,
) -> Result<(Cow<'_, [u8]>, u64), io::Error> {
let start = match usize::try_from(start) {
Ok(start) => start,
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof, "unexpected EOF"
))
}
};
let end = match start.checked_add(len) {
Some(end) => end,
None => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof, "unexpected EOF"
))
}
};
if end > self.len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof, "unexpected EOF"
))
}
Ok((self.as_slice()[start..end].into(), super::usize_to_u64(end)))
}
pub fn write(
&mut self, start: u64, data: &[u8]
) -> Result<u64, io::Error> {
let start = match usize::try_from(start) {
Ok(start) => start,
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof, "unexpected EOF"
))
}
};
let end = match start.checked_add(data.len()) {
Some(end) => end,
None => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof, "unexpected EOF"
))
}
};
if end > self.len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof, "unexpected EOF"
))
}
self.as_slice_mut()[start..end].copy_from_slice(data);
Ok(super::usize_to_u64(end))
}
pub fn sync(&self) -> Result<(), io::Error> {
unsafe {
Ok(msync(self.ptr, self.len, MsFlags::MS_ASYNC)?)
}
}
}
unsafe impl Sync for Mmap { }
unsafe impl Send for Mmap { }
}
const fn usize_to_u64(value: usize) -> u64 {
#[cfg(not(any(
target_pointer_width = "16",
target_pointer_width = "32",
target_pointer_width = "64",
)))]
assert!(value <= u64::MAX as usize);
value as u64
}
#[derive(Debug)]
pub enum ArchiveError {
Corrupt(&'static str),
Io(io::Error),
}
impl From<io::Error> for ArchiveError {
fn from(err: io::Error) -> Self {
Self::Io(err)
}
}
impl fmt::Display for ArchiveError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ArchiveError::Corrupt(s) => write!(f, "archive corrupted: {s}"),
ArchiveError::Io(ref err) => write!(f, "{err}")
}
}
}
#[derive(Debug)]
pub enum OpenError {
NotFound,
Archive(ArchiveError),
}
impl From<io::Error> for OpenError {
fn from(err: io::Error) -> Self {
ArchiveError::Io(err).into()
}
}
impl From<ArchiveError> for OpenError {
fn from(err: ArchiveError) -> Self {
match err {
ArchiveError::Io(err) if matches!(
err.kind(), io::ErrorKind::NotFound
) => Self::NotFound,
_ => Self::Archive(err),
}
}
}
impl fmt::Display for OpenError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
OpenError::NotFound => f.write_str("not found"),
OpenError::Archive(ref err) => write!(f, "{err}"),
}
}
}
#[derive(Debug)]
pub enum PublishError {
AlreadyExists,
Archive(ArchiveError),
}
impl From<ArchiveError> for PublishError {
fn from(err: ArchiveError) -> Self {
Self::Archive(err)
}
}
impl From<io::Error> for PublishError {
fn from(err: io::Error) -> Self {
Self::Archive(err.into())
}
}
#[derive(Debug)]
pub enum AccessError<T> {
NotFound,
Inconsistent(T),
Archive(ArchiveError),
}
impl<T> From<ArchiveError> for AccessError<T> {
fn from(err: ArchiveError) -> Self {
Self::Archive(err)
}
}
#[derive(Debug)]
pub enum FetchError {
NotFound,
Archive(ArchiveError),
}
impl From<ArchiveError> for FetchError {
fn from(err: ArchiveError) -> Self {
Self::Archive(err)
}
}
#[cfg(test)]
mod test {
use super::*;
use std::collections::HashMap;
#[derive(Clone, Copy, Debug)]
enum Op {
Publish { name: &'static [u8], data: &'static [u8] },
Update { name: &'static [u8], data: &'static [u8] },
Delete { name: &'static [u8] },
}
use self::Op::*;
impl ObjectMeta for () {
const SIZE: usize = 4;
type ConsistencyError = ();
fn write(
&self, write: &mut StorageWrite
) -> Result<(), ArchiveError> {
write.write(b"abcd")
}
fn read(
read: &mut StorageRead
) -> Result<Self, ArchiveError> {
let slice = read.read_slice(4).unwrap();
assert_eq!(slice.as_ref(), b"abcd");
Ok(())
}
}
fn check_archive(
archive: &Archive<()>,
content: &HashMap<&'static [u8], &'static [u8]>,
) {
archive.verify().unwrap();
let mut content = content.clone();
for item in archive.objects().unwrap() {
let (name, _, data) = item.unwrap();
assert_eq!(
content.remove(name.as_ref()),
Some(data.as_ref())
);
}
assert!(content.is_empty());
}
fn run_archive(ops: impl IntoIterator<Item = Op>) {
let mut archive = Archive::create_with_file(
tempfile::tempfile().unwrap()
).unwrap();
let mut content = HashMap::new();
for item in ops {
match item {
Op::Publish { name, data } => {
assert!(content.insert(name, data).is_none());
archive.publish(name, &(), data).unwrap();
check_archive(&archive, &content);
assert_eq!(
archive.fetch(name).unwrap().as_ref(),
data
);
}
Op::Update { name, data } => {
assert!(content.insert(name, data).is_some());
archive.update(name, &(), data, |_| Ok(())).unwrap();
assert_eq!(
archive.fetch(name).unwrap().as_ref(),
data
);
}
Op::Delete { name } => {
assert!(content.remove(name).is_some());
archive.delete(name, |_| Ok(())).unwrap();
assert!(matches!(
archive.fetch(name),
Err(FetchError::NotFound)
));
}
}
check_archive(&archive, &content);
}
}
#[test]
fn empty_archive() {
run_archive([])
}
#[test]
fn publish_replace() {
run_archive([
Publish { name: b"1", data: b"bar" },
Publish { name: b"2", data: &[0; 1024]},
Publish { name: b"3", data: b"aaa" },
Delete { name: b"2" },
Publish { name: b"4", data: b"bar" },
Update { name: b"4", data: b"bar" },
])
}
}