use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use futures::future::BoxFuture;
use tokio::sync::RwLock;
use crate::error::{Error, Result};
pub trait MultipartWriter: Send {
fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>>;
fn complete(self: Box<Self>) -> BoxFuture<'static, Result<()>>;
}
pub trait Storage: Send + Sync {
fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
fn write(&self, data: Bytes) -> BoxFuture<'_, Result<()>>;
fn size(&self) -> BoxFuture<'_, Result<u64>>;
fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>>;
}
#[derive(Debug, Clone)]
pub struct InMemoryStorage {
data: Arc<RwLock<Vec<u8>>>,
}
impl InMemoryStorage {
pub fn new() -> Self {
Self {
data: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn from_bytes(data: Vec<u8>) -> Self {
Self {
data: Arc::new(RwLock::new(data)),
}
}
}
impl Default for InMemoryStorage {
fn default() -> Self {
Self::new()
}
}
impl Storage for InMemoryStorage {
fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
Box::pin(async move {
let data = self.data.read().await;
let start = range.start as usize;
let end = range.end as usize;
if end > data.len() {
return Err(Error::Storage(format!(
"read range {}..{} exceeds file size {}",
start,
end,
data.len()
)));
}
Ok(Bytes::copy_from_slice(&data[start..end]))
})
}
fn write(&self, bytes: Bytes) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
let mut data = self.data.write().await;
*data = bytes.to_vec();
Ok(())
})
}
fn size(&self) -> BoxFuture<'_, Result<u64>> {
Box::pin(async move {
let data = self.data.read().await;
Ok(data.len() as u64)
})
}
fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>> {
Box::pin(async move {
Ok(Box::new(InMemoryMultipart {
data: Arc::clone(&self.data),
buf: Vec::new(),
}) as Box<dyn MultipartWriter>)
})
}
}
struct InMemoryMultipart {
data: Arc<RwLock<Vec<u8>>>,
buf: Vec<u8>,
}
impl MultipartWriter for InMemoryMultipart {
fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
self.buf.extend_from_slice(&data);
Ok(())
})
}
fn complete(self: Box<Self>) -> BoxFuture<'static, Result<()>> {
Box::pin(async move {
let mut guard = self.data.write().await;
*guard = self.buf;
Ok(())
})
}
}
#[derive(Clone)]
pub struct ObjectStoreBackend {
store: Arc<dyn object_store::ObjectStore>,
path: object_store::path::Path,
}
impl ObjectStoreBackend {
pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
Self { store, path }
}
}
impl Storage for ObjectStoreBackend {
fn read_range(&self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
Box::pin(async move {
use object_store::ObjectStoreExt;
let bytes = self
.store
.get_range(&self.path, range)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(bytes)
})
}
fn write(&self, data: Bytes) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
use object_store::ObjectStoreExt;
self.store
.put(&self.path, data.into())
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(())
})
}
fn size(&self) -> BoxFuture<'_, Result<u64>> {
Box::pin(async move {
use object_store::ObjectStoreExt;
let meta = self
.store
.head(&self.path)
.await
.map_err(|e| Error::Storage(e.to_string()))?;
Ok(meta.size as u64)
})
}
fn write_multipart(&self) -> BoxFuture<'_, Result<Box<dyn MultipartWriter>>> {
Box::pin(async move {
use object_store::buffered::BufWriter;
let writer = BufWriter::with_capacity(
Arc::clone(&self.store),
self.path.clone(),
8 * 1024 * 1024,
);
Ok(Box::new(ObjectStoreMultipart { writer }) as Box<dyn MultipartWriter>)
})
}
}
struct ObjectStoreMultipart {
writer: object_store::buffered::BufWriter,
}
impl MultipartWriter for ObjectStoreMultipart {
fn write_chunk(&mut self, data: Bytes) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
self.writer
.write_all(&data)
.await
.map_err(|e| Error::Storage(e.to_string()))
})
}
fn complete(mut self: Box<Self>) -> BoxFuture<'static, Result<()>> {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
self.writer
.shutdown()
.await
.map_err(|e| Error::Storage(e.to_string()))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn in_memory_write_read() {
let storage = InMemoryStorage::new();
let payload = Bytes::from_static(b"hello world");
storage.write(payload.clone()).await.unwrap();
let size = storage.size().await.unwrap();
assert_eq!(size, 11);
let read = storage.read_range(0..5).await.unwrap();
assert_eq!(&read[..], b"hello");
}
#[tokio::test]
async fn in_memory_out_of_range() {
let storage = InMemoryStorage::from_bytes(vec![1, 2, 3]);
let result = storage.read_range(0..10).await;
assert!(result.is_err());
}
#[tokio::test]
async fn in_memory_overwrite() {
let storage = InMemoryStorage::new();
storage.write(Bytes::from_static(b"first")).await.unwrap();
storage.write(Bytes::from_static(b"second")).await.unwrap();
let size = storage.size().await.unwrap();
assert_eq!(size, 6);
let data = storage.read_range(0..6).await.unwrap();
assert_eq!(&data[..], b"second");
}
#[tokio::test]
async fn in_memory_multipart_streams_chunks() {
let storage = InMemoryStorage::from_bytes(b"stale".to_vec());
let mut writer = storage.write_multipart().await.unwrap();
writer
.write_chunk(Bytes::from_static(b"hello "))
.await
.unwrap();
writer
.write_chunk(Bytes::from_static(b"streaming "))
.await
.unwrap();
writer
.write_chunk(Bytes::from_static(b"world"))
.await
.unwrap();
writer.complete().await.unwrap();
assert_eq!(storage.size().await.unwrap(), 21);
let data = storage.read_range(0..21).await.unwrap();
assert_eq!(&data[..], b"hello streaming world");
}
}