use std::{
any::Any,
borrow::Borrow,
cmp::Ordering,
fmt::Debug,
hash::Hash,
ops::{Bound, Deref, RangeBounds},
sync::Arc,
};
pub trait BlobStore: Debug + Send + Sync + 'static {
type Error: From<NoError> + From<anyhow::Error> + Debug;
fn read(&self, id: &[u8]) -> std::result::Result<OwnedBlob, Self::Error>;
fn write(&self, data: &[u8]) -> std::result::Result<Vec<u8>, Self::Error>;
fn sync(&self) -> std::result::Result<(), Self::Error>;
fn needs_deep_detach(&self) -> bool {
true
}
}
#[derive(Debug, Clone)]
pub struct Blob<'a> {
data: &'a [u8],
owner: Option<Arc<dyn Any>>,
}
unsafe impl<'a> Sync for Blob<'a> {}
unsafe impl<'a> Send for Blob<'a> {}
impl<'a> AsRef<[u8]> for Blob<'a> {
fn as_ref(&self) -> &[u8] {
self.data
}
}
impl<'a> Deref for Blob<'a> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.as_ref()
}
}
impl<'a> Borrow<[u8]> for Blob<'a> {
fn borrow(&self) -> &[u8] {
self.as_ref()
}
}
pub(crate) type OwnedBlob = Blob<'static>;
impl<'a> Hash for Blob<'a> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.data.hash(state);
}
}
impl<'a, 'b> PartialEq<Blob<'b>> for Blob<'a> {
fn eq(&self, other: &Blob<'b>) -> bool {
self.data == other.data
}
}
impl<'a> Eq for Blob<'a> {}
impl<'a, 'b> PartialOrd<Blob<'b>> for Blob<'a> {
fn partial_cmp(&self, other: &Blob<'b>) -> Option<Ordering> {
self.data.partial_cmp(other.data)
}
}
impl<'a> Ord for Blob<'a> {
fn cmp(&self, other: &Self) -> Ordering {
self.data.cmp(other.data)
}
}
impl<'a> Blob<'a> {
pub fn empty() -> Self {
Self::new(&[])
}
pub fn new(data: &'a [u8]) -> Self {
Self { data, owner: None }
}
pub fn copy_from_slice(data: &[u8]) -> OwnedBlob {
OwnedBlob::from_arc_vec(Arc::new(data.to_vec()))
}
pub fn from_arc_vec(arc: Arc<Vec<u8>>) -> OwnedBlob {
let data: &[u8] = arc.as_ref();
let data: &'static [u8] = unsafe { std::mem::transmute(data) };
OwnedBlob::owned_new(data, Some(arc))
}
pub fn to_owned(self) -> OwnedBlob {
if self.owner.is_some() {
OwnedBlob {
data: unsafe { std::mem::transmute(self.data) },
owner: self.owner,
}
} else {
OwnedBlob::copy_from_slice(self.data)
}
}
pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
let len = self.len();
let begin = match range.start_bound() {
Bound::Included(&n) => n,
Bound::Excluded(&n) => n + 1,
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&n) => n.checked_add(1).expect("out of range"),
Bound::Excluded(&n) => n,
Bound::Unbounded => len,
};
assert!(
begin <= end,
"range start must not be greater than end: {:?} <= {:?}",
begin,
end,
);
assert!(
end <= len,
"range end out of bounds: {:?} <= {:?}",
end,
len,
);
if end == begin {
return Self::empty();
}
Self {
data: &self.data[begin..end],
owner: self.owner.clone(),
}
}
pub fn slice_ref(&self, subset: &[u8]) -> Self {
if subset.is_empty() {
return Self::empty();
}
let bytes_p = self.as_ptr() as usize;
let bytes_len = self.len();
let sub_p = subset.as_ptr() as usize;
let sub_len = subset.len();
assert!(
sub_p >= bytes_p,
"subset pointer ({:p}) is smaller than self pointer ({:p})",
sub_p as *const u8,
bytes_p as *const u8,
);
assert!(
sub_p + sub_len <= bytes_p + bytes_len,
"subset is out of bounds: self = ({:p}, {}), subset = ({:p}, {})",
bytes_p as *const u8,
bytes_len,
sub_p as *const u8,
sub_len,
);
let sub_offset = sub_p - bytes_p;
self.slice(sub_offset..(sub_offset + sub_len))
}
}
impl From<Arc<Vec<u8>>> for OwnedBlob {
fn from(v: Arc<Vec<u8>>) -> Self {
let bytes: &[u8] = v.as_ref();
let bytes: &'static [u8] = unsafe { std::mem::transmute(bytes) };
Self::owned_new(bytes, Some(v))
}
}
impl<const N: usize> From<Arc<[u8; N]>> for OwnedBlob {
fn from(v: Arc<[u8; N]>) -> Self {
let bytes: &[u8] = v.as_ref();
let bytes: &'static [u8] = unsafe { std::mem::transmute(bytes) };
Self::owned_new(bytes, Some(v))
}
}
impl OwnedBlob {
pub fn owned_new(data: &'static [u8], owner: Option<Arc<dyn Any>>) -> OwnedBlob {
Self { data, owner }
}
}
pub type DynBlobStore = Arc<dyn BlobStore<Error = anyhow::Error>>;
impl BlobStore for DynBlobStore {
type Error = anyhow::Error;
fn read(&self, id: &[u8]) -> std::result::Result<OwnedBlob, Self::Error> {
self.as_ref().read(id)
}
fn write(&self, data: &[u8]) -> std::result::Result<Vec<u8>, Self::Error> {
self.as_ref().write(data)
}
fn sync(&self) -> std::result::Result<(), Self::Error> {
self.as_ref().sync()
}
fn needs_deep_detach(&self) -> bool {
self.as_ref().needs_deep_detach()
}
}
#[derive(Debug, Clone, Default)]
pub struct Detached;
impl BlobStore for Detached {
type Error = NoError;
fn read(&self, _id: &[u8]) -> std::result::Result<OwnedBlob, Self::Error> {
panic!()
}
fn write(&self, _data: &[u8]) -> std::result::Result<Vec<u8>, Self::Error> {
panic!()
}
fn sync(&self) -> std::result::Result<(), Self::Error> {
panic!()
}
fn needs_deep_detach(&self) -> bool {
false
}
}
#[derive(Debug)]
pub enum NoError {}
impl From<NoError> for anyhow::Error {
fn from(_: NoError) -> Self {
panic!()
}
}
impl From<anyhow::Error> for NoError {
fn from(_: anyhow::Error) -> Self {
panic!()
}
}
pub trait UnwrapSafeExt<T> {
fn unwrap_safe(self) -> T;
}
impl<T> UnwrapSafeExt<T> for Result<T, NoError> {
fn unwrap_safe(self) -> T {
self.unwrap()
}
}
#[cfg(test)]
mod tests {
#![allow(dead_code)]
use proptest::prelude::*;
use tempfile::tempfile;
use std::sync::Arc;
use std::any::Any;
use crate::store::blob_store::OwnedBlob;
const TEST_SIZE: usize = 1024;
unsafe fn custom_new(slice: &[u8], owner: Arc<dyn Any>) -> OwnedBlob {
let slice: &'static [u8] = std::mem::transmute(slice);
OwnedBlob::owned_new(slice, Some(owner))
}
#[test]
fn zero_copy_mmap() -> anyhow::Result<()> {
use memmap::MmapOptions;
use std::{io::Write, sync::Arc};
let mut large_file = tempfile().unwrap();
large_file.write_all(&[0u8; 1024 * 1024])?;
let mmap = Arc::new(unsafe { MmapOptions::new().map(&large_file).unwrap() });
let slice: &'static [u8] = unsafe { std::mem::transmute(&mmap[10..10000]) };
let _bytes = OwnedBlob::owned_new(slice, Some(mmap.clone()));
Ok(())
}
fn large_blocks() -> impl Strategy<Value = Vec<Vec<u8>>> {
proptest::collection::vec(
proptest::collection::vec(any::<u8>(), 0..TEST_SIZE - 4),
1..10,
)
}
fn small_blocks() -> impl Strategy<Value = Vec<Vec<u8>>> {
proptest::collection::vec(
proptest::collection::vec(any::<u8>(), 0..TEST_SIZE / 10),
1..100,
)
}
fn test_blocks() -> impl Strategy<Value = Vec<Vec<u8>>> {
prop_oneof![large_blocks(), small_blocks(),]
}
proptest! {}
}