use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use crate::error::EdgestoreError;
#[derive(Debug, Clone, Copy)]
pub struct PlacementHint {
pub cohort_bucket: i64,
}
pub trait StorageBackend: Send + Sync {
fn read(&self, path: &Path, offset: u64, data: &mut [u8]) -> Result<usize, EdgestoreError>;
fn write(&self, path: &Path, offset: u64, data: &[u8]) -> Result<(), EdgestoreError>;
fn write_with_hint(
&self,
path: &Path,
offset: u64,
data: &[u8],
_hint: PlacementHint,
) -> Result<(), EdgestoreError> {
self.write(path, offset, data)
}
fn flush(&self, path: &Path) -> Result<(), EdgestoreError>;
fn read_all(&self, path: &Path) -> Result<Vec<u8>, EdgestoreError> {
let mut buf = Vec::new();
let mut offset = 0u64;
let chunk_size = 64 * 1024;
loop {
let mut chunk = vec![0u8; chunk_size];
let n = self.read(path, offset, &mut chunk)?;
if n == 0 {
break;
}
buf.extend_from_slice(&chunk[..n]);
offset += n as u64;
if n < chunk_size {
break;
}
}
Ok(buf)
}
}
#[derive(Default)]
pub struct DefaultStorageBackend;
impl DefaultStorageBackend {
pub fn new() -> Self {
DefaultStorageBackend
}
}
impl StorageBackend for DefaultStorageBackend {
fn read(&self, path: &Path, offset: u64, data: &mut [u8]) -> Result<usize, EdgestoreError> {
use std::io::{Read, Seek, SeekFrom};
let mut f = std::fs::File::open(path)?;
f.seek(SeekFrom::Start(offset))?;
let n = f.read(data)?;
Ok(n)
}
fn write(&self, path: &Path, offset: u64, data: &[u8]) -> Result<(), EdgestoreError> {
use std::io::{Seek, SeekFrom, Write};
std::fs::create_dir_all(path.parent().unwrap_or(Path::new("")))?;
let mut f = std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(path)?;
f.seek(SeekFrom::Start(offset))?;
f.write_all(data)?;
Ok(())
}
fn flush(&self, path: &Path) -> Result<(), EdgestoreError> {
use std::io::Write;
let f = std::fs::OpenOptions::new().write(true).open(path)?;
let mut f = std::io::BufWriter::new(f);
f.flush()?;
let inner = f.into_inner().map_err(|e| EdgestoreError::Io(e.into_error()))?;
inner.sync_all()?;
Ok(())
}
}
pub struct MemoryStorageBackend {
files: Mutex<HashMap<PathBuf, Vec<u8>>>,
}
impl Default for MemoryStorageBackend {
fn default() -> Self {
Self::new()
}
}
impl MemoryStorageBackend {
pub fn new() -> Self {
MemoryStorageBackend {
files: Mutex::new(HashMap::new()),
}
}
}
impl StorageBackend for MemoryStorageBackend {
fn read(&self, path: &Path, offset: u64, data: &mut [u8]) -> Result<usize, EdgestoreError> {
let files = self.files.lock().unwrap();
let contents = files.get(path).cloned().unwrap_or_default();
let start = offset as usize;
if start >= contents.len() {
return Ok(0);
}
let end = (start + data.len()).min(contents.len());
let n = end - start;
data[..n].copy_from_slice(&contents[start..end]);
Ok(n)
}
fn write(&self, path: &Path, offset: u64, data: &[u8]) -> Result<(), EdgestoreError> {
let mut files = self.files.lock().unwrap();
let contents = files.entry(path.to_path_buf()).or_default();
let start = offset as usize;
let end = start + data.len();
if end > contents.len() {
contents.resize(end, 0);
}
contents[start..end].copy_from_slice(data);
Ok(())
}
fn flush(&self, _path: &Path) -> Result<(), EdgestoreError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_backend_write_read() {
let backend = DefaultStorageBackend::new();
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.bin");
backend.write(&path, 0, b"hello world").unwrap();
backend.flush(&path).unwrap();
let mut buf = [0u8; 11];
let n = backend.read(&path, 0, &mut buf).unwrap();
assert_eq!(n, 11);
assert_eq!(&buf, b"hello world");
}
#[test]
fn test_default_backend_partial_read() {
let backend = DefaultStorageBackend::new();
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.bin");
backend.write(&path, 0, b"hello world").unwrap();
backend.flush(&path).unwrap();
let mut buf = [0u8; 5];
let n = backend.read(&path, 6, &mut buf).unwrap();
assert_eq!(n, 5);
assert_eq!(&buf, b"world");
}
#[test]
fn test_memory_backend_write_read() {
let backend = MemoryStorageBackend::new();
let path = Path::new("/tmp/test.bin");
backend.write(path, 0, b"hello world").unwrap();
backend.flush(path).unwrap();
let mut buf = [0u8; 11];
let n = backend.read(path, 0, &mut buf).unwrap();
assert_eq!(n, 11);
assert_eq!(&buf, b"hello world");
}
#[test]
fn test_memory_backend_overwrite() {
let backend = MemoryStorageBackend::new();
let path = Path::new("/tmp/test.bin");
backend.write(path, 0, b"hello").unwrap();
backend.write(path, 3, b"XX").unwrap();
let mut buf = [0u8; 5];
let n = backend.read(path, 0, &mut buf).unwrap();
assert_eq!(n, 5);
assert_eq!(&buf, b"helXX");
}
#[test]
fn test_memory_backend_isolated() {
let backend_a = MemoryStorageBackend::new();
let backend_b = MemoryStorageBackend::new();
let path = Path::new("/tmp/test.bin");
backend_a.write(path, 0, b"a").unwrap();
backend_b.write(path, 0, b"b").unwrap();
let mut buf = [0u8; 1];
backend_a.read(path, 0, &mut buf).unwrap();
assert_eq!(&buf, b"a");
backend_b.read(path, 0, &mut buf).unwrap();
assert_eq!(&buf, b"b");
}
}