tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::{
    marker::PhantomData,
    task::{Context, Poll},
};

use bytes::Bytes;
use tempest_io::Io;

use crate::{
    StorageError,
    base::{Comparer, InternalKey},
    iterator::StorageIterator,
};

pub(crate) struct LogicalDedupIterator<I, C, S>
where
    I: Io,
    C: Comparer,
    S: StorageIterator<I, C>,
{
    inner: S,
    prev: Option<InternalKey<C, Bytes>>,
    _marker: PhantomData<I>,
}

impl<I, C, S> LogicalDedupIterator<I, C, S>
where
    I: Io,
    C: Comparer,
    S: StorageIterator<I, C>,
{
    pub fn new(inner: S) -> Self {
        Self {
            inner,
            prev: None,
            _marker: PhantomData,
        }
    }
}
/// A [`StorageIterator`] adapter that collapses multiple versions of the same
/// logical key into one, emitting only the first (newest) version seen.
///
/// Two keys are considered the same logical entry if [`C::compare_logical`]
/// returns [`Ordering::Equal`] - that is, their prefixes are equal regardless
/// of their suffix. This means that for a comparer with a version suffix (e.g.
/// an HLC timestamp), all versions of the same row are collapsed to the newest,
/// since physical ordering guarantees the newest suffix sorts first.
///
/// For [`DefaultComparer`], which has no suffix, this degenerates to simple
/// exact-key deduplication.
impl<I, C, S> StorageIterator<I, C> for LogicalDedupIterator<I, C, S>
where
    I: Io,
    C: Comparer,
    S: StorageIterator<I, C>,
{
    async fn next(&mut self) -> Result<Option<(InternalKey<C, Bytes>, Bytes)>, StorageError> {
        while let Some((key, value)) = self.inner.next().await? {
            if let Some(prev) = &self.prev
                && prev.compare_logical(&key).is_eq()
            {
                trace!(
                    key.len = key.key().len(),
                    seqnum = key.trailer().seqnum().get(),
                    "dedup_iter: skipping superseded version"
                );
                continue;
            } else {
                self.prev = Some(key.clone());
            }
            return Ok(Some((key, value)));
        }
        Ok(None)
    }

    async fn seek(&mut self, key: InternalKey<C, Bytes>) -> Result<(), StorageError> {
        self.inner.seek(key).await
    }
}

#[cfg(test)]
mod tests {
    use tempest_core::test_utils::setup_tracing;
    use tempest_io::VirtualIo;
    use tempest_rt::block_on;

    use crate::{
        base::{DefaultComparer, FixedSuffixComparer, KeyKind, KeyTrailer, SeqNum},
        iterator::mock::MockIterator,
    };

    use super::*;

    #[test]
    fn test_deduplicating_with_different_trailers() {
        block_on(VirtualIo::default(), async {
            let key_a_bytes = Bytes::from("user1");
            let key_a_v2 = InternalKey::new(
                key_a_bytes.clone(),
                KeyTrailer::new(SeqNum::new(100).unwrap(), KeyKind::Put),
            );
            let key_a_v1 = InternalKey::new(
                key_a_bytes,
                KeyTrailer::new(SeqNum::new(50).unwrap(), KeyKind::Put),
            );

            let inner = MockIterator::<DefaultComparer>::new()
                .add_with_key(key_a_v2, "new-val")
                .add_with_key(key_a_v1, "old-val");

            let mut deduplicating_iter = LogicalDedupIterator::<VirtualIo, _, _>::new(inner);

            // version 2 comes first
            assert_eq!(
                deduplicating_iter.next().await.unwrap().unwrap().1,
                "new-val"
            );

            // version 1 is skipped automatically by the deduplicator
            assert!(deduplicating_iter.next().await.unwrap().is_none());
        })
    }

    #[test]
    fn test_deduplicating_with_fixed_suffix() {
        block_on(VirtualIo::default(), async {
            let key_v1 = InternalKey::new(
                Bytes::from("user1A"),
                KeyTrailer::new(SeqNum::new(50).unwrap(), KeyKind::Put),
            );
            let key_v2 = InternalKey::new(
                Bytes::from("user1B"),
                KeyTrailer::new(SeqNum::new(50).unwrap(), KeyKind::Put),
            );

            let inner = MockIterator::<FixedSuffixComparer<1>>::new()
                .add_with_key(key_v1, "val-version-A")
                .add_with_key(key_v2, "val-version-B");

            let mut deduplicating_iter = LogicalDedupIterator::<VirtualIo, _, _>::new(inner);

            // yields B
            assert_eq!(
                deduplicating_iter.next().await.unwrap().unwrap().1,
                "val-version-A",
            );

            // skips A because it shares the logical prefix "user1"
            assert!(matches!(deduplicating_iter.next().await, Ok(None)));
        })
    }
}