tlpt 0.7.0

Telepathy: syncable, append-only logs and sets
Documentation
use std::{cell::RefCell, collections::HashMap, pin::Pin, rc::Rc};

use futures::{future, stream, Future, Stream};

pub trait StorageBackend {
    fn get_key<'a>(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<Vec<u8>>> + 'a>>;
    fn set_key<'a>(&self, key: &str, value: Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + 'a>>;

    fn get_stream<'a>(&self, key: &str) -> Pin<Box<dyn Stream<Item = Vec<u8>> + 'a>>;
    fn append_to_stream<'a>(
        &self,
        key: &str,
        value: Vec<u8>,
        expected_offset: Option<usize>,
    ) -> Pin<Box<dyn Future<Output = ()> + 'a>>;

    fn clone_ref(&self) -> Box<dyn StorageBackend>;
}

#[derive(Clone)]
pub struct MemoryStorageBackend {
    data: Rc<RefCell<HashMap<String, Vec<u8>>>>,
}

impl Default for MemoryStorageBackend {
    fn default() -> Self {
        Self::new()
    }
}

impl MemoryStorageBackend {
    pub fn new() -> Self {
        Self {
            data: Rc::new(RefCell::new(HashMap::new())),
        }
    }
}

impl StorageBackend for MemoryStorageBackend {
    fn get_key<'a>(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<Vec<u8>>> + 'a>> {
        Box::pin(future::ready(self.data.borrow().get(key).cloned()))
    }

    fn set_key<'a>(&self, key: &str, value: Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
        self.data.borrow_mut().insert(key.to_string(), value);
        Box::pin(future::ready(()))
    }

    fn get_stream<'a>(&self, key: &str) -> Pin<Box<dyn Stream<Item = Vec<u8>> + 'a>> {
        let data = self.data.clone();
        let single_chunk = data
            .borrow()
            .get(key)
            .cloned()
            .into_iter()
            .collect::<Vec<Vec<u8>>>();
        Box::pin(stream::iter(single_chunk))
    }

    fn append_to_stream<'a>(
        &self,
        key: &str,
        value: Vec<u8>,
        expected_offset: Option<usize>,
    ) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
        let mut data_ref = self.data.borrow_mut();
        let entry = data_ref.entry(key.to_string()).or_insert_with(Vec::new);

        if let Some(offset) = expected_offset {
            assert_eq!(entry.len(), offset);
        }

        entry.extend(value);
        Box::pin(future::ready(()))
    }

    fn clone_ref(&self) -> Box<dyn StorageBackend> {
        Box::new(self.clone())
    }
}

pub struct ShardedStorageBackend {
    shards: Vec<Box<dyn StorageBackend>>,
}

impl ShardedStorageBackend {
    pub fn new(shards: Vec<Box<dyn StorageBackend>>) -> Self {
        Self { shards }
    }

    fn get_shard_idx(&self, key: &str) -> usize {
        // TODO: hash! ascii is not distributed well
        let key_bytes = key.as_bytes();
        let key_suffix_int = fxhash::hash32(&key_bytes[key_bytes.len() - 4..]);
        key_suffix_int as usize % self.shards.len()
    }
}

impl StorageBackend for ShardedStorageBackend {
    fn get_key<'a>(&self, key: &str) -> Pin<Box<dyn Future<Output = Option<Vec<u8>>> + 'a>> {
        let shard_idx = self.get_shard_idx(key);
        self.shards[shard_idx].get_key(key)
    }

    fn set_key<'a>(&self, key: &str, value: Vec<u8>) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
        let shard_idx = self.get_shard_idx(key);
        self.shards[shard_idx].set_key(key, value)
    }

    fn get_stream<'a>(&self, key: &str) -> Pin<Box<dyn Stream<Item = Vec<u8>> + 'a>> {
        let shard_idx = self.get_shard_idx(key);
        self.shards[shard_idx].get_stream(key)
    }

    fn append_to_stream<'a>(
        &self,
        key: &str,
        value: Vec<u8>,
        expected_offset: Option<usize>,
    ) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
        let shard_idx = self.get_shard_idx(key);
        self.shards[shard_idx].append_to_stream(key, value, expected_offset)
    }

    fn clone_ref(&self) -> Box<dyn StorageBackend> {
        Box::new(ShardedStorageBackend {
            shards: self.shards.iter().map(|s| s.clone_ref()).collect(),
        })
    }
}