laminarmq 0.0.5

A scalable, distributed message queue powered by a segmented, partitioned, replicated and immutable log.
Documentation
use super::{
    super::super::commit_log::segmented_log::segment::{SegmentStorage, SegmentStorageProvider},
    storage::{InMemStorage, InMemStorageError},
};
use async_trait::async_trait;
use std::{cell::RefCell, collections::BTreeMap, rc::Rc};

type Mem = Rc<RefCell<Vec<u8>>>;

type Map<Idx> = BTreeMap<Idx, (Mem, Mem)>;

type StorageMap<Idx> = Rc<RefCell<Map<Idx>>>;

pub struct InMemSegmentStorageProvider<Idx> {
    storage_map: StorageMap<Idx>,
}

impl<Idx> Default for InMemSegmentStorageProvider<Idx> {
    fn default() -> Self {
        Self {
            storage_map: Default::default(),
        }
    }
}

impl<Idx> Clone for InMemSegmentStorageProvider<Idx> {
    fn clone(&self) -> Self {
        Self {
            storage_map: self.storage_map.clone(),
        }
    }
}

#[async_trait(?Send)]
impl<Idx> SegmentStorageProvider<InMemStorage, Idx> for InMemSegmentStorageProvider<Idx>
where
    Idx: Clone + Ord,
{
    async fn obtain_base_indices_of_stored_segments(
        &mut self,
    ) -> Result<Vec<Idx>, InMemStorageError> {
        loop {
            let mut storage_map = self
                .storage_map
                .try_borrow_mut()
                .map_err(|_| InMemStorageError::BorrowError)?;

            if storage_map.is_empty() {
                break;
            }

            let (_, (index, _)) = storage_map
                .last_key_value()
                .ok_or(InMemStorageError::StorageNotFound)?;

            if !index
                .try_borrow()
                .map_err(|_| InMemStorageError::BorrowError)?
                .is_empty()
            {
                break;
            }

            storage_map
                .pop_last()
                .ok_or(InMemStorageError::StorageNotFound)?;
        }

        Ok(self
            .storage_map
            .try_borrow()
            .map_err(|_| InMemStorageError::BorrowError)?
            .keys()
            .cloned()
            .collect())
    }

    async fn obtain(
        &mut self,
        segment_base_idx: &Idx,
    ) -> Result<SegmentStorage<InMemStorage>, InMemStorageError> {
        let mut storage_map = self
            .storage_map
            .try_borrow_mut()
            .map_err(|_| InMemStorageError::BorrowError)?;

        if !storage_map.contains_key(segment_base_idx) {
            let (index_storage, store_storage) = (
                Rc::new(RefCell::new(Vec::<u8>::new())),
                Rc::new(RefCell::new(Vec::<u8>::new())),
            );

            storage_map.insert(segment_base_idx.clone(), (index_storage, store_storage));
        }

        let (index, store) = storage_map
            .get(segment_base_idx)
            .ok_or(InMemStorageError::StorageNotFound)?;

        Ok(SegmentStorage {
            index: InMemStorage::new(index.clone())?,
            store: InMemStorage::new(store.clone())?,
        })
    }
}

#[cfg(test)]
mod tests {
    use super::{
        super::super::super::{
            super::common::serde_compat::bincode, commit_log::segmented_log::segment,
        },
        *,
    };
    use std::marker::PhantomData;

    #[test]
    fn test_segment_read_append_truncate_consistency() {
        futures_lite::future::block_on(async {
            segment::test::_test_segment_read_append_truncate_consistency(
                InMemSegmentStorageProvider::<u32>::default(),
                PhantomData::<((), crc32fast::Hasher, bincode::BinCode)>,
            )
            .await;
        });
    }
}