tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::collections::VecDeque;

use bytes::Bytes;
use tempest_io::Io;

use crate::{
    StorageError,
    base::{Comparer, InternalKey},
    iterator::StorageIterator,
};

/// A simple mock iterator for testing that emits the values inside in order.
#[derive(Default, Clone)]
pub(crate) struct MockIterator<C: Comparer> {
    data: VecDeque<(InternalKey<C, Bytes>, Bytes)>,
}

impl<C: Comparer> MockIterator<C> {
    pub(crate) fn new() -> Self {
        Default::default()
    }

    /// Add a custom key entry with `key` as the key and `value` as the value.
    pub(crate) fn add_with_key(
        mut self,
        key: InternalKey<C, Bytes>,
        value: impl Into<Bytes>,
    ) -> Self {
        let value = value.into();
        if let Some((k, _)) = self.data.iter().next_back() {
            assert!(
                k < &key,
                "must manually ensure mock iterator is in order, key: {:?}",
                key
            );
        }
        self.data.push_back((key, value));
        self
    }

    /// Add a test key entry with the id `key_id` and `value` as the value.
    pub(crate) fn add(self, key_id: u64, value: impl Into<Bytes>) -> Self {
        self.add_with_key(InternalKey::test(key_id), value)
    }

    pub(crate) fn add_items(
        self,
        items: impl IntoIterator<Item = (u64, u64, impl Into<Bytes>)>,
    ) -> Self {
        items.into_iter().fold(self, |iter, (id, seqnum, value)| {
            iter.add_with_key(InternalKey::test_with_seqnum(id, seqnum), value)
        })
    }
}

impl<I: Io, C: Comparer> StorageIterator<I, C> for MockIterator<C> {
    async fn next(&mut self) -> Result<Option<(InternalKey<C, Bytes>, Bytes)>, StorageError> {
        Ok(self.data.pop_front())
    }

    async fn seek(&mut self, key: InternalKey<C, Bytes>) -> Result<(), StorageError> {
        while let Some((front, _)) = self.data.front() {
            if front.compare_logical(&key).is_lt() {
                self.data.pop_front();
            } else {
                break;
            }
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use tempest_io::VirtualIo;
    use tempest_rt::block_on;

    use crate::base::{DefaultComparer, InternalKey};

    use super::*;

    type Mock = MockIterator<DefaultComparer>;

    async fn next(mock: &mut Mock) -> Option<(InternalKey<DefaultComparer, Bytes>, Bytes)> {
        <Mock as StorageIterator<VirtualIo, DefaultComparer>>::next(mock)
            .await
            .unwrap()
    }

    async fn seek(mock: &mut Mock, key_id: u64) {
        <Mock as StorageIterator<VirtualIo, DefaultComparer>>::seek(
            mock,
            InternalKey::test(key_id),
        )
        .await
        .unwrap();
    }

    #[test]
    fn test_next() {
        block_on(VirtualIo::default(), async {
            let mut mock = Mock::new().add(0, "a").add(1, "b").add(2, "c");

            let (key, val) = next(&mut mock).await.unwrap();
            assert_eq!(key.test_key_as_u64(), 0);
            assert_eq!(val, "a");

            let (key, val) = next(&mut mock).await.unwrap();
            assert_eq!(key.test_key_as_u64(), 1);
            assert_eq!(val, "b");

            let (key, val) = next(&mut mock).await.unwrap();
            assert_eq!(key.test_key_as_u64(), 2);
            assert_eq!(val, "c");

            assert!(next(&mut mock).await.is_none());
        });
    }

    #[test]
    fn test_seek_forward() {
        block_on(VirtualIo::default(), async {
            let mut mock = Mock::new().add(0, "a").add(1, "b").add(2, "c").add(3, "d");

            seek(&mut mock, 2).await;

            let (key, val) = next(&mut mock).await.unwrap();
            assert_eq!(key.test_key_as_u64(), 2);
            assert_eq!(val, "c");

            let (key, _) = next(&mut mock).await.unwrap();
            assert_eq!(key.test_key_as_u64(), 3);
        });
    }

    #[test]
    fn test_seek_backward_noop() {
        block_on(VirtualIo::default(), async {
            let mut mock = Mock::new().add(0, "a").add(1, "b").add(2, "c");

            seek(&mut mock, 2).await;

            // backward seek — cursor must not move
            seek(&mut mock, 0).await;

            let (key, _) = next(&mut mock).await.unwrap();
            assert_eq!(key.test_key_as_u64(), 2);
        });
    }
}