use std::{
marker::PhantomData,
task::{Context, Poll},
};
use bytes::Bytes;
use tempest_io::Io;
use crate::{
StorageError,
base::{Comparer, InternalKey},
iterator::StorageIterator,
};
pub(crate) struct LogicalDedupIterator<I, C, S>
where
I: Io,
C: Comparer,
S: StorageIterator<I, C>,
{
inner: S,
prev: Option<InternalKey<C, Bytes>>,
_marker: PhantomData<I>,
}
impl<I, C, S> LogicalDedupIterator<I, C, S>
where
I: Io,
C: Comparer,
S: StorageIterator<I, C>,
{
pub fn new(inner: S) -> Self {
Self {
inner,
prev: None,
_marker: PhantomData,
}
}
}
impl<I, C, S> StorageIterator<I, C> for LogicalDedupIterator<I, C, S>
where
I: Io,
C: Comparer,
S: StorageIterator<I, C>,
{
async fn next(&mut self) -> Result<Option<(InternalKey<C, Bytes>, Bytes)>, StorageError> {
while let Some((key, value)) = self.inner.next().await? {
if let Some(prev) = &self.prev
&& prev.compare_logical(&key).is_eq()
{
trace!(
key.len = key.key().len(),
seqnum = key.trailer().seqnum().get(),
"dedup_iter: skipping superseded version"
);
continue;
} else {
self.prev = Some(key.clone());
}
return Ok(Some((key, value)));
}
Ok(None)
}
async fn seek(&mut self, key: InternalKey<C, Bytes>) -> Result<(), StorageError> {
self.inner.seek(key).await
}
}
#[cfg(test)]
mod tests {
use tempest_core::test_utils::setup_tracing;
use tempest_io::VirtualIo;
use tempest_rt::block_on;
use crate::{
base::{DefaultComparer, FixedSuffixComparer, KeyKind, KeyTrailer, SeqNum},
iterator::mock::MockIterator,
};
use super::*;
#[test]
fn test_deduplicating_with_different_trailers() {
block_on(VirtualIo::default(), async {
let key_a_bytes = Bytes::from("user1");
let key_a_v2 = InternalKey::new(
key_a_bytes.clone(),
KeyTrailer::new(SeqNum::new(100).unwrap(), KeyKind::Put),
);
let key_a_v1 = InternalKey::new(
key_a_bytes,
KeyTrailer::new(SeqNum::new(50).unwrap(), KeyKind::Put),
);
let inner = MockIterator::<DefaultComparer>::new()
.add_with_key(key_a_v2, "new-val")
.add_with_key(key_a_v1, "old-val");
let mut deduplicating_iter = LogicalDedupIterator::<VirtualIo, _, _>::new(inner);
assert_eq!(
deduplicating_iter.next().await.unwrap().unwrap().1,
"new-val"
);
assert!(deduplicating_iter.next().await.unwrap().is_none());
})
}
#[test]
fn test_deduplicating_with_fixed_suffix() {
block_on(VirtualIo::default(), async {
let key_v1 = InternalKey::new(
Bytes::from("user1A"),
KeyTrailer::new(SeqNum::new(50).unwrap(), KeyKind::Put),
);
let key_v2 = InternalKey::new(
Bytes::from("user1B"),
KeyTrailer::new(SeqNum::new(50).unwrap(), KeyKind::Put),
);
let inner = MockIterator::<FixedSuffixComparer<1>>::new()
.add_with_key(key_v1, "val-version-A")
.add_with_key(key_v2, "val-version-B");
let mut deduplicating_iter = LogicalDedupIterator::<VirtualIo, _, _>::new(inner);
assert_eq!(
deduplicating_iter.next().await.unwrap().unwrap().1,
"val-version-A",
);
assert!(matches!(deduplicating_iter.next().await, Ok(None)));
})
}
}