tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use bytes::Bytes;

use crate::base::{Comparer, InternalKey};

/// Determines whether a key-value entry should be visible to a reader.
///
/// The filter receives the full [`InternalKey`] and value for each entry,
/// allowing it to inspect both the prefix and suffix via [`split_up`].
/// Returning `false` from [`keep`] causes the entry to be skipped entirely,
/// as if it did not exist.
///
/// Filters are constructed by the caller with whatever context they need
/// (e.g. a read timestamp or TTL threshold) and cloned once per iterator
/// stack instantiation. Implementations should be cheap to clone.
///
/// The KV layer has no knowledge of what the filter checks - all semantics
/// are owned by the implementor. For plain KV usage with no filtering,
/// use `()` (passthrough filter).
///
/// [`keep`]: Filter::keep
/// [`split_up`]: InternalKey::split_up
pub trait Filter<C: Comparer>: Clone {
    /// Returns `true` if the entry should be visible, `false` if it should
    /// be skipped. Called once per entry on the read and compaction paths.
    fn keep(&self, key: &InternalKey<C>, value: &Bytes) -> bool;
}

///// A [`StorageIterator`] adapter that skips entries rejected by a [`Filter`].
/////
///// On every step, [`Filter::keep`] is called with the current key and value.
///// Entries that return `false` are silently skipped and iteration continues.
///// Entries that return `true` are surfaced to the caller unchanged.
/////
///// This is used on both the read path (e.g. MVCC timestamp filtering, TTL
///// expiry) and the compaction path (e.g. watermark-based GC), with different
///// [`Filter`] implementations injected by the engine layer in each case.
//pub struct FilterIterator<'i, I, C, F>
//where
//    I: StorageIterator<'i, C>,
//    C: Comparer,
//    F: Filter<C>,
//{
//    inner: I,
//    filter: F,
//    _marker: PhantomData<(&'i (), C)>,
//}
//
//impl<'i, I, C, F> FilterIterator<'i, I, C, F>
//where
//    I: StorageIterator<'i, C>,
//    C: Comparer,
//    F: Filter<C>,
//{
//    /// Creates a new `FilterIterator` wrapping `inner`, applying `filter`
//    /// to every entry before surfacing it to the caller.
//    pub fn new(inner: I, filter: F) -> Self {
//        Self {
//            inner,
//            filter,
//            _marker: PhantomData,
//        }
//    }
//}
//
//impl<'i, I, C, F> StorageIterator<'i, C> for FilterIterator<'i, I, C, F>
//where
//    I: StorageIterator<'i, C>,
//    C: Comparer,
//    F: Filter<C>,
//{
//    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<crate::base::StorageResult<Option<()>>> {
//        loop {
//            match self.inner.poll_next(cx) {
//                Poll::Ready(Ok(Some(()))) => {
//                    let key = self.inner.key().unwrap();
//                    let value = self.inner.value().unwrap();
//                    if self.filter.keep(key, value) {
//                        return Poll::Ready(Ok(Some(())));
//                    }
//                }
//                other => return other,
//            }
//        }
//    }
//
//    fn poll_seek(
//        &mut self,
//        key: &[u8],
//        cx: &mut Context<'_>,
//    ) -> Poll<crate::base::StorageResult<()>> {
//        match self.inner.poll_seek(key, cx) {
//            Poll::Ready(Ok(())) => loop {
//                // NB: after seeking, me may have landed on a version that has to be filtered,
//                // so advance until we find one inside of our bound or until we are exhausted
//                match self.inner.key() {
//                    Some(k) => {
//                        if self.filter.keep(k, self.inner.value().unwrap()) {
//                            return Poll::Ready(Ok(()));
//                        }
//                        match self.inner.poll_next(cx) {
//                            // we still have some left, so we advance to the next key
//                            Poll::Ready(Ok(Some(()))) => continue,
//                            // we have none left or errored, so bubble that up
//                            other => return other.map(|r| r.map(|_| ())),
//                        }
//                    }
//                    None => return Poll::Ready(Ok(())),
//                }
//            },
//            other => other,
//        }
//    }
//
//    fn key(&self) -> Option<&InternalKey<C>> {
//        self.inner.key()
//    }
//
//    fn value(&self) -> Option<&Bytes> {
//        self.inner.value()
//    }
//}

// allow for chaining multiple filters through a tuple
macro_rules! impl_filter_tuple {
    ($($F:ident),+) => {
        impl<C: Comparer, $($F: Filter<C>),+> Filter<C> for ($($F,)+) {
            fn keep(&self, key: &InternalKey<C>, value: &Bytes) -> bool {
                #[allow(non_snake_case)]
                let ($($F,)+) = self;
                $($F.keep(key, value))&&+
            }
        }
    };
}

// implement filtering for the `()` type, acting as a passthrough filter that
// keeps everything, no matter what
impl<C: Comparer> Filter<C> for () {
    fn keep(&self, _key: &InternalKey<C>, _value: &Bytes) -> bool {
        true
    }
}

impl_filter_tuple!(F1);
impl_filter_tuple!(F1, F2);
impl_filter_tuple!(F1, F2, F3);
impl_filter_tuple!(F1, F2, F3, F4);
impl_filter_tuple!(F1, F2, F3, F4, F5);
impl_filter_tuple!(F1, F2, F3, F4, F5, F6);
impl_filter_tuple!(F1, F2, F3, F4, F5, F6, F7);
impl_filter_tuple!(F1, F2, F3, F4, F5, F6, F7, F8);

//#[cfg(test)]
//mod tests {
//    use bytes::Bytes;
//
//    use crate::{
//        base::{DefaultComparer, InternalKey},
//        iterator::{
//            StorageIterator,
//            filter::{Filter, FilterIterator},
//            mock::MockIterator,
//        },
//    };
//
//    /// A filter that rejects entries whose value matches a given bytes value.
//    #[derive(Clone)]
//    struct RejectValueFilter(Bytes);
//
//    impl Filter<DefaultComparer> for RejectValueFilter {
//        fn keep(&self, _key: &InternalKey<DefaultComparer>, value: &Bytes) -> bool {
//            value != &self.0
//        }
//    }
//
//    /// A filter that rejects entries whose key id is odd.
//    #[derive(Clone)]
//    struct RejectOddKeyFilter;
//
//    impl Filter<DefaultComparer> for RejectOddKeyFilter {
//        fn keep(&self, key: &InternalKey<DefaultComparer>, _value: &Bytes) -> bool {
//            key.test_key_as_u64() % 2 == 0
//        }
//    }
//
//    #[tokio::test]
//    async fn test_passthrough_keeps_everything() {
//        let mock = MockIterator::<DefaultComparer>::new()
//            .add(0, "a")
//            .add(1, "b")
//            .add(2, "c");
//
//        let mut iter = FilterIterator::new(mock, ());
//        let mut count = 0;
//        while iter.next().await.unwrap().is_some() {
//            count += 1;
//        }
//        assert_eq!(count, 3);
//    }
//
//    #[tokio::test]
//    async fn test_filter_rejects_matching_value() {
//        let mock = MockIterator::<DefaultComparer>::new()
//            .add(0, "keep")
//            .add(1, "reject")
//            .add(2, "keep");
//
//        let mut iter = FilterIterator::new(mock, RejectValueFilter(Bytes::from("reject")));
//
//        iter.next().await.unwrap();
//        assert_eq!(iter.value().unwrap(), "keep");
//
//        iter.next().await.unwrap();
//        assert_eq!(iter.value().unwrap(), "keep");
//
//        assert!(iter.next().await.unwrap().is_none());
//    }
//
//    #[tokio::test]
//    async fn test_filter_rejects_all() {
//        let mock = MockIterator::<DefaultComparer>::new()
//            .add(0, "reject")
//            .add(1, "reject")
//            .add(2, "reject");
//
//        let mut iter = FilterIterator::new(mock, RejectValueFilter(Bytes::from("reject")));
//        assert!(iter.next().await.unwrap().is_none());
//    }
//
//    #[tokio::test]
//    async fn test_filter_rejects_odd_keys() {
//        let mock = MockIterator::<DefaultComparer>::new()
//            .add(0, "a")
//            .add(1, "b")
//            .add(2, "c")
//            .add(3, "d")
//            .add(4, "e");
//
//        let mut iter = FilterIterator::new(mock, RejectOddKeyFilter);
//        let mut keys = Vec::new();
//        while iter.next().await.unwrap().is_some() {
//            keys.push(iter.key().unwrap().test_key_as_u64());
//        }
//        assert_eq!(keys, vec![0, 2, 4]);
//    }
//
//    #[tokio::test]
//    async fn test_seek_skips_filtered_entry() {
//        let mock = MockIterator::<DefaultComparer>::new()
//            .add(0, "reject")
//            .add(1, "reject")
//            .add(2, "keep")
//            .add(3, "keep");
//
//        let mut iter = FilterIterator::new(mock, RejectValueFilter(Bytes::from("reject")));
//        // seek to 0, lands on filtered entry, should advance to 2
//        iter.seek(&InternalKey::<DefaultComparer>::test(0).key().to_vec())
//            .await
//            .unwrap();
//
//        assert_eq!(iter.key().unwrap().test_key_as_u64(), 2);
//        assert_eq!(iter.value().unwrap(), "keep");
//    }
//
//    #[tokio::test]
//    async fn test_seek_lands_on_valid_entry() {
//        let mock = MockIterator::<DefaultComparer>::new()
//            .add(0, "keep")
//            .add(1, "keep");
//
//        let mut iter = FilterIterator::new(mock, ());
//        iter.seek(&InternalKey::<DefaultComparer>::test(0).key().to_vec())
//            .await
//            .unwrap();
//
//        assert_eq!(iter.key().unwrap().test_key_as_u64(), 0);
//    }
//
//    #[tokio::test]
//    async fn test_seek_all_remaining_filtered() {
//        let mock = MockIterator::<DefaultComparer>::new()
//            .add(0, "keep")
//            .add(1, "reject")
//            .add(2, "reject");
//
//        let mut iter = FilterIterator::new(mock, RejectValueFilter(Bytes::from("reject")));
//        iter.seek(&InternalKey::<DefaultComparer>::test(1).key().to_vec())
//            .await
//            .unwrap();
//
//        assert!(iter.key().is_none());
//    }
//
//    #[tokio::test]
//    async fn test_chained_filters() {
//        // rejects odd keys AND entries with value "reject"
//        let mock = MockIterator::<DefaultComparer>::new()
//            .add(0, "keep") // passes both
//            .add(1, "keep") // rejected by odd key filter
//            .add(2, "reject") // rejected by value filter
//            .add(3, "reject") // rejected by both
//            .add(4, "keep"); // passes both
//
//        let filter = (RejectOddKeyFilter, RejectValueFilter(Bytes::from("reject")));
//        let mut iter = FilterIterator::new(mock, filter);
//
//        let mut keys = Vec::new();
//        while iter.next().await.unwrap().is_some() {
//            keys.push(iter.key().unwrap().test_key_as_u64());
//        }
//
//        assert_eq!(keys, vec![0, 4]);
//    }
//}