use archive::{ArchiveBuilder, Index, Struct, VariadicStruct};
use error::ResourceStorageError;
use memory::{SizeType, PADDING_SIZE};
use multivector::MultiVector;
use vector::ExternalVector;
use std::cell::RefCell;
use std::fmt;
use std::io::{self, Seek, Write};
use std::mem;
use std::ops::DerefMut;
use std::ptr;
use std::rc::Rc;
use std::slice;
use std::str;
pub trait Stream: Write + Seek {}
pub trait ResourceStorage {
fn read(
&mut self,
resource_name: &str,
schema: &str,
) -> Result<MemoryDescriptor, ResourceStorageError> {
self.read_and_check_schema(resource_name, schema)
}
fn write(&mut self, resource_name: &str, schema: &str, data: &[u8]) -> io::Result<()> {
let stream = self.create_output_stream(resource_name)?;
let mut mut_stream = stream.borrow_mut();
write_to_stream(data, mut_stream.deref_mut())?;
let schema_name = format!("{}.schema", resource_name);
let stream = self.create_output_stream(&schema_name)?;
let mut mut_stream = stream.borrow_mut();
write_schema(schema, mut_stream.deref_mut())
}
fn subdir(&self, dir: &str) -> Rc<RefCell<ResourceStorage>>;
fn exists(&self, resource_name: &str) -> bool;
fn read_resource(&mut self, resource_name: &str) -> Result<MemoryDescriptor, io::Error>;
fn create_output_stream(&mut self, resource_name: &str) -> io::Result<Rc<RefCell<Stream>>>;
fn read_and_check_schema(
&mut self,
resource_name: &str,
expected_schema: &str,
) -> Result<MemoryDescriptor, ResourceStorageError> {
let data = self
.read_resource(resource_name)
.map_err(|e| ResourceStorageError::from_io_error(e, resource_name.into()))?;
let schema_name = format!("{}.schema", resource_name);
let schema = self
.read_resource(&schema_name)
.map_err(|e| ResourceStorageError::from_io_error(e, resource_name.into()))?;
if data.size_in_bytes() < mem::size_of::<SizeType>() + PADDING_SIZE {
return Err(ResourceStorageError::UnexpectedDataSize);
}
let size = read_bytes!(SizeType, data.data()) as usize;
if size + mem::size_of::<SizeType>() + PADDING_SIZE != data.size_in_bytes() {
return Err(ResourceStorageError::UnexpectedDataSize);
}
let stored_schema_slice: &[u8] =
unsafe { slice::from_raw_parts(schema.data(), schema.size_in_bytes()) };
let stored_schema =
str::from_utf8(stored_schema_slice).map_err(ResourceStorageError::Utf8Error)?;
if stored_schema != expected_schema {
return Err(ResourceStorageError::WrongSignature {
resource_name: resource_name.into(),
diff: diff(stored_schema, expected_schema),
});
}
Ok(MemoryDescriptor::new(
unsafe { data.data().offset(mem::size_of::<SizeType>() as isize) },
size,
))
}
}
pub fn create_external_vector<T: Struct>(
storage: &mut ResourceStorage,
resource_name: &str,
schema: &str,
) -> io::Result<ExternalVector<T>> {
let schema_name = format!("{}.schema", resource_name);
let stream = storage.create_output_stream(&schema_name)?;
stream.borrow_mut().write_all(schema.as_bytes())?;
let data_writer = storage.create_output_stream(resource_name)?;
let handle = ResourceHandle::new(data_writer)?;
Ok(ExternalVector::new(handle))
}
pub fn create_multi_vector<Idx: Index, Ts: VariadicStruct>(
storage: &mut ResourceStorage,
resource_name: &str,
schema: &str,
) -> io::Result<MultiVector<Idx, Ts>> {
let index_name = format!("{}_index", resource_name);
let index_schema = format!("index({})", schema);
let index = create_external_vector(storage, &index_name, &index_schema)?;
let schema_name = format!("{}.schema", resource_name);
let stream = storage.create_output_stream(&schema_name)?;
stream.borrow_mut().write_all(schema.as_bytes())?;
let data_writer = storage.create_output_stream(resource_name)?;
let handle = ResourceHandle::new(data_writer)?;
Ok(MultiVector::new(index, handle))
}
pub fn create_archive<T: ArchiveBuilder>(
storage: &Rc<RefCell<ResourceStorage>>,
) -> Result<(), ResourceStorageError> {
let signature_name = format!("{}.archive", T::NAME);
{
let storage = storage.borrow();
if storage.exists(&signature_name) {
return Err(ResourceStorageError::from_io_error(
io::Error::new(io::ErrorKind::AlreadyExists, signature_name.clone()),
signature_name,
));
}
}
{
let mut mut_storage = storage.borrow_mut();
mut_storage
.write(&signature_name, T::SCHEMA, &[])
.map_err(|e| ResourceStorageError::from_io_error(e, signature_name))?;
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct MemoryDescriptor {
ptr: *const u8,
size: usize,
}
impl Default for MemoryDescriptor {
fn default() -> MemoryDescriptor {
MemoryDescriptor {
ptr: ptr::null(),
size: 0,
}
}
}
impl MemoryDescriptor {
pub fn new(ptr: *const u8, size: usize) -> MemoryDescriptor {
MemoryDescriptor { ptr, size }
}
pub fn data(&self) -> *const u8 {
self.ptr
}
pub fn size_in_bytes(&self) -> usize {
self.size
}
}
#[derive(Clone)]
pub struct ResourceHandle {
stream: Option<Rc<RefCell<Stream>>>,
size_in_bytes: usize,
}
impl ResourceHandle {
pub fn new(stream: Rc<RefCell<Stream>>) -> io::Result<Self> {
{
let mut mut_stream = stream.borrow_mut();
write_size(0u64, mut_stream.deref_mut())?;
}
Ok(Self {
stream: Some(stream),
size_in_bytes: 0,
})
}
pub fn is_open(&self) -> bool {
self.stream.is_some()
}
pub fn write(&mut self, data: &[u8]) -> io::Result<()> {
let stream = self
.stream
.as_ref()
.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "stream closed"))?;
let res = stream.borrow_mut().write_all(data);
if res.is_ok() {
self.size_in_bytes += data.len();
}
res
}
pub fn close(&mut self) -> io::Result<()> {
{
let stream = self
.stream
.as_ref()
.ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "stream closed"))?;
let mut mut_stream = stream.borrow_mut();
write_padding(mut_stream.deref_mut())?;
mut_stream.seek(io::SeekFrom::Start(0u64))?;
write_size(self.size_in_bytes as u64, mut_stream.deref_mut())?;
}
self.stream = None;
Ok(())
}
}
impl fmt::Debug for ResourceHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"ResourceHandle {{ is_open: {}, size_in_bytes: {} }}",
self.is_open(),
self.size_in_bytes,
)
}
}
fn diff(left: &str, right: &str) -> String {
use diff;
diff::lines(left, right)
.into_iter()
.map(|l| match l {
diff::Result::Left(l) => format!("-{}", l),
diff::Result::Both(l, _) => format!(" {}", l),
diff::Result::Right(r) => format!("+{}", r),
})
.collect::<Vec<_>>()
.join("\n")
}
fn write_to_stream(data: &[u8], stream: &mut Stream) -> io::Result<()> {
write_size(data.len() as u64, stream)?;
stream.write_all(data)?;
write_padding(stream)
}
fn write_schema(schema: &str, stream: &mut Stream) -> io::Result<()> {
stream.write_all(schema.as_bytes())
}
fn write_size(value: SizeType, stream: &mut Stream) -> io::Result<()> {
const SIZE_OF_SIZE_TYPE: usize = mem::size_of::<SizeType>();
let mut buffer: [u8; SIZE_OF_SIZE_TYPE] = [0; SIZE_OF_SIZE_TYPE];
write_bytes!(SizeType; value, &mut buffer, 0, SIZE_OF_SIZE_TYPE * 8);
stream.write_all(&buffer)
}
fn write_padding(stream: &mut Stream) -> io::Result<()> {
let zeroes: [u8; PADDING_SIZE] = [0; PADDING_SIZE];
stream.write_all(&zeroes)
}