use super::{BlockLocation, FileId, FileReader, FileRw, FileWriter, StorageBackend, StorageError};
use crate::circuit::metrics::FILES_CREATED;
use crate::storage::buffer_cache::FBuf;
use feldera_storage::{FileCommitter, StorageFileType, StoragePath};
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::{
collections::HashMap,
io::ErrorKind,
sync::{Arc, RwLock},
};
struct MemoryFile {
file_id: FileId,
path: StoragePath,
blocks: Vec<(u64, Arc<FBuf>)>,
size: u64,
}
impl FileRw for MemoryFile {
fn file_id(&self) -> FileId {
self.file_id
}
fn path(&self) -> &StoragePath {
&self.path
}
}
struct MemoryBackendInner {
files: RwLock<HashMap<StoragePath, Arc<MemoryFile>>>,
usage: Arc<AtomicI64>,
}
#[derive(Clone)]
pub struct MemoryBackend(Arc<MemoryBackendInner>);
impl Default for MemoryBackend {
fn default() -> Self {
Self(Arc::new(MemoryBackendInner {
files: RwLock::new(HashMap::new()),
usage: Arc::new(AtomicI64::new(0)),
}))
}
}
impl MemoryBackend {
pub fn new() -> Self {
Self::default()
}
}
struct MemoryWriter {
backend: MemoryBackend,
file: MemoryFile,
drop: DeleteOnDrop,
}
impl MemoryWriter {
fn new(backend: MemoryBackend, name: &StoragePath) -> Self {
Self {
file: MemoryFile {
file_id: FileId::new(),
path: name.clone(),
blocks: Vec::new(),
size: 0,
},
drop: DeleteOnDrop {
usage: backend.0.usage.clone(),
size: 0,
},
backend,
}
}
}
impl FileRw for MemoryWriter {
fn file_id(&self) -> FileId {
self.file.file_id
}
fn path(&self) -> &StoragePath {
&self.file.path
}
}
impl FileWriter for MemoryWriter {
fn write_block(&mut self, data: FBuf) -> Result<Arc<FBuf>, StorageError> {
let data = Arc::new(data);
self.file.blocks.push((self.file.size, data.clone()));
self.file.size += data.len() as u64;
self.drop.size += data.len() as u64;
self.drop
.usage
.fetch_add(data.len() as i64, Ordering::Relaxed);
Ok(data)
}
fn complete(mut self: Box<Self>) -> Result<Arc<dyn FileReader>, StorageError> {
self.drop.size = 0;
let file = Arc::new(self.file);
self.backend
.0
.files
.write()
.unwrap()
.insert(file.path.clone(), file.clone());
let reader = Arc::new(MemoryReader {
backend: self.backend,
file,
keep: AtomicBool::new(false),
});
Ok(reader)
}
}
struct DeleteOnDrop {
usage: Arc<AtomicI64>,
size: u64,
}
impl Drop for DeleteOnDrop {
fn drop(&mut self) {
self.usage.fetch_sub(self.size as i64, Ordering::Relaxed);
}
}
struct MemoryReader {
backend: MemoryBackend,
file: Arc<MemoryFile>,
keep: AtomicBool,
}
impl Debug for MemoryReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MemoryReader({})", self.file.path)
}
}
impl FileRw for MemoryReader {
fn file_id(&self) -> FileId {
self.file.file_id
}
fn path(&self) -> &StoragePath {
&self.file.path
}
}
impl FileCommitter for MemoryReader {
fn commit(&self) -> Result<(), StorageError> {
Ok(())
}
}
impl FileReader for MemoryReader {
fn mark_for_checkpoint(&self) {
self.keep.store(true, Ordering::Relaxed);
}
fn read_block(&self, location: BlockLocation) -> Result<Arc<FBuf>, StorageError> {
if location.after() > self.file.size {
return Err(StorageError::stdio(
ErrorKind::UnexpectedEof,
"read",
&self.file.path,
));
}
let index = self
.file
.blocks
.partition_point(|(offset, _)| *offset <= location.offset);
let index = match self.file.blocks.get(index) {
Some((offset, data)) if location.offset == *offset => {
if location.size == data.len() {
return Ok(data.clone());
}
index
}
_ => index - 1,
};
let mut block = FBuf::with_capacity(location.size);
for (offset, data) in &self.file.blocks[index..] {
let start = location.offset.max(*offset) - *offset;
let end = location.after().min(*offset + data.len() as u64) - *offset;
block.extend_from_slice(&data[start as usize..end as usize]);
if block.len() >= location.size {
debug_assert_eq!(block.len(), location.size);
return Ok(Arc::new(block));
}
}
unreachable!();
}
fn get_size(&self) -> Result<u64, StorageError> {
Ok(self.file.size)
}
}
impl Drop for MemoryReader {
fn drop(&mut self) {
if !self.keep.load(Ordering::Relaxed) {
let _ = self.backend.delete(&self.file.path);
}
}
}
impl StorageBackend for MemoryBackend {
fn create_named(&self, name: &StoragePath) -> Result<Box<dyn FileWriter>, StorageError> {
let fm = MemoryWriter::new(self.clone(), name);
FILES_CREATED.fetch_add(1, Ordering::Relaxed);
Ok(Box::new(fm))
}
fn open(&self, name: &StoragePath) -> Result<Arc<dyn FileReader>, StorageError> {
let files = self.0.files.read().unwrap();
match files.get(name) {
Some(file) => Ok(Arc::new(MemoryReader {
backend: self.clone(),
file: file.clone(),
keep: AtomicBool::new(true),
})),
None => Err(StorageError::stdio(ErrorKind::NotFound, "open", name)),
}
}
fn list(
&self,
parent: &StoragePath,
cb: &mut dyn FnMut(&StoragePath, StorageFileType),
) -> Result<(), StorageError> {
let entries = self
.0
.files
.read()
.unwrap()
.iter()
.filter(|&(name, _file)| name.prefix_matches(parent))
.map(|(name, file)| (name.clone(), file.size))
.collect::<Vec<_>>();
for (path, size) in entries {
cb(&path, StorageFileType::File { size });
}
Ok(())
}
fn delete(&self, name: &StoragePath) -> Result<(), StorageError> {
let mut files = self.0.files.write().unwrap();
match files.remove(name) {
Some(file) => {
self.0.usage.fetch_sub(file.size as i64, Ordering::Relaxed);
Ok(())
}
None => Err(StorageError::stdio(ErrorKind::NotFound, "delete", name)),
}
}
fn delete_recursive(&self, parent: &StoragePath) -> Result<(), StorageError> {
self.0
.files
.write()
.unwrap()
.retain(|name, _content| !name.prefix_matches(parent));
Ok(())
}
fn usage(&self) -> Arc<AtomicI64> {
self.0.usage.clone()
}
}
#[cfg(test)]
mod tests {
use feldera_storage::StorageBackend;
use std::{path::Path, sync::Arc};
use crate::storage::backend::{
memory_impl::MemoryBackend,
tests::{random_sizes, test_backend},
};
fn create_memory_backend(_path: &Path) -> Arc<dyn StorageBackend> {
Arc::new(MemoryBackend::new())
}
#[test]
fn sequential_1024() {
test_backend(Box::new(create_memory_backend), &[1024; 1024 * 10], true)
}
#[test]
fn delete_1024() {
test_backend(Box::new(create_memory_backend), &[1024; 1024 * 10], false)
}
#[test]
fn sequential_random() {
test_backend(Box::new(create_memory_backend), &random_sizes(), true);
}
#[test]
fn empty() {
test_backend(Box::new(create_memory_backend), &[], true);
}
}