use std::{cell::RefCell, collections::HashMap, pin::Pin, rc::Rc};
use futures::{future, stream, Future, Stream};
pub trait StorageBackend {
fn get_key<'a>(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<Vec<u8>>> + 'a>>;
fn set_key<'a>(&self, key: &str, value: Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
fn get_stream<'a>(&self, key: &str) -> Pin<Box<dyn Stream<Item = Vec<u8>> + 'a>>;
fn append_to_stream<'a>(
&self,
key: &str,
value: Vec<u8>,
expected_offset: Option<usize>,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
fn clone_ref(&self) -> Box<dyn StorageBackend>;
}
#[derive(Clone)]
pub struct MemoryStorageBackend {
data: Rc<RefCell<HashMap<String, Vec<u8>>>>,
}
impl Default for MemoryStorageBackend {
fn default() -> Self {
Self::new()
}
}
impl MemoryStorageBackend {
pub fn new() -> Self {
Self {
data: Rc::new(RefCell::new(HashMap::new())),
}
}
}
impl StorageBackend for MemoryStorageBackend {
fn get_key<'a>(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<Vec<u8>>> + 'a>> {
Box::pin(future::ready(self.data.borrow().get(key).cloned()))
}
fn set_key<'a>(&self, key: &str, value: Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
self.data.borrow_mut().insert(key.to_string(), value);
Box::pin(future::ready(()))
}
fn get_stream<'a>(&self, key: &str) -> Pin<Box<dyn Stream<Item = Vec<u8>> + 'a>> {
let data = self.data.clone();
let single_chunk = data
.borrow()
.get(key)
.cloned()
.into_iter()
.collect::<Vec<Vec<u8>>>();
Box::pin(stream::iter(single_chunk))
}
fn append_to_stream<'a>(
&self,
key: &str,
value: Vec<u8>,
expected_offset: Option<usize>,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
let mut data_ref = self.data.borrow_mut();
let entry = data_ref.entry(key.to_string()).or_insert_with(Vec::new);
if let Some(offset) = expected_offset {
assert_eq!(entry.len(), offset);
}
entry.extend(value);
Box::pin(future::ready(()))
}
fn clone_ref(&self) -> Box<dyn StorageBackend> {
Box::new(self.clone())
}
}
pub struct ShardedStorageBackend {
shards: Vec<Box<dyn StorageBackend>>,
}
impl ShardedStorageBackend {
pub fn new(shards: Vec<Box<dyn StorageBackend>>) -> Self {
Self { shards }
}
fn get_shard_idx(&self, key: &str) -> usize {
let key_bytes = key.as_bytes();
let key_suffix_int = fxhash::hash32(&key_bytes[key_bytes.len() - 4..]);
key_suffix_int as usize % self.shards.len()
}
}
impl StorageBackend for ShardedStorageBackend {
fn get_key<'a>(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<Vec<u8>>> + 'a>> {
let shard_idx = self.get_shard_idx(key);
self.shards[shard_idx].get_key(key)
}
fn set_key<'a>(&self, key: &str, value: Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
let shard_idx = self.get_shard_idx(key);
self.shards[shard_idx].set_key(key, value)
}
fn get_stream<'a>(&self, key: &str) -> Pin<Box<dyn Stream<Item = Vec<u8>> + 'a>> {
let shard_idx = self.get_shard_idx(key);
self.shards[shard_idx].get_stream(key)
}
fn append_to_stream<'a>(
&self,
key: &str,
value: Vec<u8>,
expected_offset: Option<usize>,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
let shard_idx = self.get_shard_idx(key);
self.shards[shard_idx].append_to_stream(key, value, expected_offset)
}
fn clone_ref(&self) -> Box<dyn StorageBackend> {
Box::new(ShardedStorageBackend {
shards: self.shards.iter().map(|s| s.clone_ref()).collect(),
})
}
}