use crate::{
error::ResourceStorageError,
memory::{SizeType, PADDING_SIZE},
multivector::MultiVector,
structs::{Struct, VariadicRefFactory},
vector::ExternalVector,
};
use std::{
fmt,
io::{self, Seek, Write},
mem, str,
sync::Arc,
};
use diff;
pub type StorageHandle = Arc<dyn ResourceStorage + std::marker::Sync + std::marker::Send>;
pub trait Stream: Write + Seek {}
impl<T: Write + Seek> Stream for T {}
pub trait ResourceStorage: std::fmt::Debug {
fn read(&self, resource_name: &str, schema: &str) -> Result<&[u8], ResourceStorageError> {
self.read_and_check_schema(resource_name, schema)
}
fn write(&self, resource_name: &str, schema: &str, data: &[u8]) -> io::Result<()> {
let mut stream = self.create_output_stream(resource_name)?;
write_to_stream(data, &mut stream)?;
let schema_name = format!("{}.schema", resource_name);
let mut stream = self.create_output_stream(&schema_name)?;
write_schema(schema, &mut stream)
}
fn subdir(&self, dir: &str) -> StorageHandle;
fn exists(&self, resource_name: &str) -> bool;
fn read_resource(&self, resource_name: &str) -> Result<&[u8], io::Error>;
fn create_output_stream(&self, resource_name: &str) -> io::Result<Box<dyn Stream>>;
fn read_and_check_schema(
&self,
resource_name: &str,
expected_schema: &str,
) -> Result<&[u8], ResourceStorageError> {
let data = self.read_resource(resource_name).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
ResourceStorageError::Missing
} else {
ResourceStorageError::from_io_error(e, resource_name.into())
}
})?;
let schema_name = format!("{}.schema", resource_name);
let schema = self
.read_resource(&schema_name)
.map_err(|e| ResourceStorageError::from_io_error(e, resource_name.into()))?;
if data.len() < mem::size_of::<SizeType>() + PADDING_SIZE {
return Err(ResourceStorageError::UnexpectedDataSize);
}
let size = flatdata_read_bytes!(SizeType, data.as_ptr()) as usize;
if size + mem::size_of::<SizeType>() + PADDING_SIZE != data.len() {
return Err(ResourceStorageError::UnexpectedDataSize);
}
let stored_schema_slice: &[u8] = schema;
let stored_schema =
str::from_utf8(stored_schema_slice).map_err(ResourceStorageError::Utf8Error)?;
if stored_schema != expected_schema {
return Err(ResourceStorageError::WrongSignature {
resource_name: resource_name.into(),
diff: compute_diff(stored_schema, expected_schema),
});
}
Ok(&data[mem::size_of::<SizeType>()..][..size])
}
}
#[doc(hidden)]
pub fn check_optional_resource<T>(
resource_name: &'static str,
size_fn: impl FnOnce(&T) -> usize,
max_size: Option<usize>,
x: Result<T, ResourceStorageError>,
) -> Result<Option<T>, ResourceStorageError> {
match (x, max_size) {
(Err(ResourceStorageError::Missing), _) => Ok(None),
(Err(e), _) => Err(e),
(Ok(x), None) => Ok(Some(x)),
(Ok(x), Some(max_size)) => {
let size = size_fn(&x);
if size > max_size {
Err(ResourceStorageError::TooBig {
size,
resource_name,
})
} else {
Ok(Some(x))
}
}
}
}
#[doc(hidden)]
pub fn check_resource<T>(
resource_name: &'static str,
size_fn: impl FnOnce(&T) -> usize,
max_size: Option<usize>,
x: Result<T, ResourceStorageError>,
) -> Result<T, ResourceStorageError> {
match (x, max_size) {
(Err(ResourceStorageError::Missing), _) => Err(ResourceStorageError::MissingData),
(Err(e), _) => Err(e),
(Ok(x), None) => Ok(x),
(Ok(x), Some(max_size)) => {
let size = size_fn(&x);
if size > max_size {
Err(ResourceStorageError::TooBig {
size,
resource_name,
})
} else {
Ok(x)
}
}
}
}
#[doc(hidden)]
pub fn create_external_vector<'a, T>(
storage: &'a dyn ResourceStorage,
resource_name: &str,
schema: &str,
) -> io::Result<ExternalVector<'a, T>>
where
T: Struct,
{
let schema_name = format!("{}.schema", resource_name);
let mut stream = storage.create_output_stream(&schema_name)?;
stream.write_all(schema.as_bytes())?;
let data_writer = storage.create_output_stream(resource_name)?;
let handle =
ResourceHandle::try_new(storage, resource_name.into(), schema.into(), data_writer)?;
Ok(ExternalVector::new(handle))
}
#[doc(hidden)]
pub fn create_multi_vector<'a, Ts>(
storage: &'a dyn ResourceStorage,
resource_name: &str,
schema: &str,
) -> io::Result<MultiVector<'a, Ts>>
where
Ts: VariadicRefFactory,
{
let index_name = format!("{}_index", resource_name);
let index_schema = format!("index({})", schema);
let index = create_external_vector(storage, &index_name, &index_schema)?;
let schema_name = format!("{}.schema", resource_name);
let mut stream = storage.create_output_stream(&schema_name)?;
stream.write_all(schema.as_bytes())?;
let data_writer = storage.create_output_stream(resource_name)?;
let handle =
ResourceHandle::try_new(storage, resource_name.into(), schema.into(), data_writer)?;
Ok(MultiVector::new(index, handle))
}
#[doc(hidden)]
pub fn create_archive(
name: &str,
schema: &str,
storage: &StorageHandle,
) -> Result<(), ResourceStorageError> {
let signature_name = format!("{}.archive", name);
{
if storage.exists(&signature_name) {
return Err(ResourceStorageError::from_io_error(
io::Error::new(io::ErrorKind::AlreadyExists, signature_name.clone()),
signature_name,
));
}
}
{
storage
.write(&signature_name, schema, &[])
.map_err(|e| ResourceStorageError::from_io_error(e, signature_name))?;
}
Ok(())
}
pub struct ResourceHandle<'a> {
stream: Box<dyn Stream>,
size_in_bytes: usize,
storage: &'a dyn ResourceStorage,
name: String,
schema: String,
finalized: bool,
}
impl<'a> ResourceHandle<'a> {
pub(crate) fn try_new(
storage: &'a dyn ResourceStorage,
name: String,
schema: String,
mut stream: Box<dyn Stream>,
) -> io::Result<Self> {
{
write_size(0u64, &mut stream)?;
}
Ok(Self {
stream,
size_in_bytes: 0,
storage,
name,
schema,
finalized: false,
})
}
pub(crate) fn write(&mut self, data: &[u8]) -> io::Result<()> {
let res = self.stream.write_all(data);
if res.is_ok() {
self.size_in_bytes += data.len();
}
res
}
pub(crate) fn close(mut self) -> Result<&'a [u8], ResourceStorageError> {
self.finalize()?;
self.storage.read(&self.name, &self.schema)
}
pub(crate) fn name(&self) -> &str {
&self.name
}
fn finalize(&mut self) -> Result<(), ResourceStorageError> {
assert!(!self.finalized);
self.finalized = true;
let resource_name = self.name.clone();
let into_storage_error = |e| ResourceStorageError::from_io_error(e, resource_name.clone());
write_padding(&mut self.stream).map_err(into_storage_error)?;
self.stream
.seek(io::SeekFrom::Start(0u64))
.map_err(into_storage_error)?;
write_size(self.size_in_bytes as u64, &mut self.stream).map_err(into_storage_error)?;
Ok(())
}
}
impl<'a> fmt::Debug for ResourceHandle<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"ResourceHandle {{ name: {}, size_in_bytes: {} }}",
self.name, self.size_in_bytes,
)
}
}
fn compute_diff(left: &str, right: &str) -> String {
diff::lines(left, right)
.into_iter()
.map(|l| match l {
diff::Result::Left(l) => format!("-{}", l),
diff::Result::Both(l, _) => format!(" {}", l),
diff::Result::Right(r) => format!("+{}", r),
})
.collect::<Vec<_>>()
.join("\n")
}
fn write_to_stream(data: &[u8], stream: &mut dyn Stream) -> io::Result<()> {
write_size(data.len() as u64, stream)?;
stream.write_all(data)?;
write_padding(stream)
}
fn write_schema(schema: &str, stream: &mut dyn Stream) -> io::Result<()> {
stream.write_all(schema.as_bytes())
}
fn write_size(value: SizeType, stream: &mut dyn Stream) -> io::Result<()> {
const SIZE_OF_SIZE_TYPE: usize = mem::size_of::<SizeType>();
let mut buffer: [u8; SIZE_OF_SIZE_TYPE] = [0; SIZE_OF_SIZE_TYPE];
flatdata_write_bytes!(SizeType; value, &mut buffer, 0, SIZE_OF_SIZE_TYPE * 8);
stream.write_all(&buffer)
}
fn write_padding(stream: &mut dyn Stream) -> io::Result<()> {
let zeroes: [u8; PADDING_SIZE] = [0; PADDING_SIZE];
stream.write_all(&zeroes)
}
#[cfg(test)]
mod test {
use super::*;
use crate::memstorage::MemoryResourceStorage;
#[test]
fn test_not_panic_on_close() -> Result<(), ResourceStorageError> {
let storage = MemoryResourceStorage::new("/root/extvec");
let mut stream = storage
.create_output_stream("/root/extvec/blubb.schema")
.unwrap();
stream.write_all("myschema".as_bytes()).unwrap();
let stream = storage.create_output_stream("/root/extvec/blubb").unwrap();
let h = ResourceHandle::try_new(
&*storage,
"/root/extvec/blubb".into(),
"myschema".into(),
stream,
)
.unwrap();
h.close().map(|_| ())
}
}