use std::collections::BTreeSet;
use bytes::Bytes;
use common::PutRecordOp;
use common::Ttl::NoExpiry;
use common::storage::PutOptions;
use crate::error::Result;
use crate::model::SegmentId;
use crate::serde::{ListingEntryKey, ListingEntryValue};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LogKey {
pub key: Bytes,
}
impl LogKey {
pub fn new(key: Bytes) -> Self {
Self { key }
}
}
pub struct LogKeyIterator {
keys: std::collections::btree_set::IntoIter<Bytes>,
}
impl LogKeyIterator {
pub(crate) fn from_keys(keys: BTreeSet<Bytes>) -> Self {
Self {
keys: keys.into_iter(),
}
}
pub async fn next(&mut self) -> Result<Option<LogKey>> {
Ok(self.keys.next().map(LogKey::new))
}
}
pub(crate) struct ListingCache {
current_segment_id: Option<SegmentId>,
keys: BTreeSet<Bytes>,
}
impl ListingCache {
pub(crate) fn new() -> Self {
Self {
current_segment_id: None,
keys: BTreeSet::new(),
}
}
pub(crate) fn assign_new_keys(
&mut self,
segment_id: SegmentId,
keys: &[Bytes],
records: &mut Vec<PutRecordOp>,
) -> Vec<Bytes> {
if self.current_segment_id != Some(segment_id) {
self.keys.clear();
self.current_segment_id = Some(segment_id);
}
let value = ListingEntryValue::new().serialize();
let mut new_keys = Vec::new();
for key in keys {
if self.keys.contains(key) {
continue;
}
let storage_key = ListingEntryKey::new(segment_id, key.clone()).serialize();
let record = common::Record::new(storage_key, value.clone());
records.push(PutRecordOp::new_with_options(
record,
PutOptions { ttl: NoExpiry },
));
self.keys.insert(key.clone());
new_keys.push(key.clone());
}
new_keys
}
#[cfg(test)]
fn is_new(&self, segment_id: SegmentId, key: &Bytes) -> bool {
if self.current_segment_id != Some(segment_id) {
return true;
}
!self.keys.contains(key)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::LogStorageRead;
use common::Storage;
mod log_key_iterator {
use opendata_macros::storage_test;
use super::*;
async fn write_listing_entry(storage: &dyn common::Storage, segment_id: u32, key: &[u8]) {
let storage_key =
ListingEntryKey::new(segment_id, Bytes::copy_from_slice(key)).serialize();
let value = ListingEntryValue::new().serialize();
storage
.put_with_options(
vec![common::Record::new(storage_key, value).into()],
common::WriteOptions::default(),
)
.await
.unwrap();
}
#[storage_test]
async fn should_return_empty_for_empty_range(storage: Arc<dyn Storage>) {
let keys = storage.list_keys(0..0).await.unwrap();
assert!(keys.is_empty());
}
#[storage_test]
async fn should_return_empty_when_no_listing_entries(storage: Arc<dyn Storage>) {
let keys = storage.list_keys(0..10).await.unwrap();
assert!(keys.is_empty());
}
#[storage_test]
async fn should_iterate_keys_in_single_segment(storage: Arc<dyn Storage>) {
write_listing_entry(&*storage, 0, b"key-a").await;
write_listing_entry(&*storage, 0, b"key-b").await;
write_listing_entry(&*storage, 0, b"key-c").await;
let keys: Vec<Bytes> = storage.list_keys(0..1).await.unwrap().into_iter().collect();
assert_eq!(keys.len(), 3);
assert_eq!(keys[0], Bytes::from("key-a"));
assert_eq!(keys[1], Bytes::from("key-b"));
assert_eq!(keys[2], Bytes::from("key-c"));
}
#[storage_test]
async fn should_iterate_keys_across_multiple_segments(storage: Arc<dyn Storage>) {
write_listing_entry(&*storage, 0, b"key-a").await;
write_listing_entry(&*storage, 1, b"key-b").await;
write_listing_entry(&*storage, 2, b"key-c").await;
let keys = storage.list_keys(0..3).await.unwrap();
assert_eq!(keys.len(), 3);
}
#[storage_test]
async fn should_deduplicate_keys_across_segments(storage: Arc<dyn Storage>) {
write_listing_entry(&*storage, 0, b"shared-key").await;
write_listing_entry(&*storage, 1, b"shared-key").await;
write_listing_entry(&*storage, 2, b"shared-key").await;
let keys = storage.list_keys(0..3).await.unwrap();
assert_eq!(keys.len(), 1);
assert!(keys.contains(&Bytes::from("shared-key")));
}
#[storage_test]
async fn should_respect_segment_range(storage: Arc<dyn Storage>) {
write_listing_entry(&*storage, 0, b"key-0").await;
write_listing_entry(&*storage, 1, b"key-1").await;
write_listing_entry(&*storage, 2, b"key-2").await;
write_listing_entry(&*storage, 3, b"key-3").await;
let keys: Vec<Bytes> = storage.list_keys(1..3).await.unwrap().into_iter().collect();
assert_eq!(keys.len(), 2);
assert_eq!(keys[0], Bytes::from("key-1"));
assert_eq!(keys[1], Bytes::from("key-2"));
}
#[storage_test]
async fn should_return_keys_in_lexicographic_order(storage: Arc<dyn Storage>) {
write_listing_entry(&*storage, 0, b"zebra").await;
write_listing_entry(&*storage, 0, b"apple").await;
write_listing_entry(&*storage, 0, b"mango").await;
let keys: Vec<Bytes> = storage.list_keys(0..1).await.unwrap().into_iter().collect();
assert_eq!(keys[0], Bytes::from("apple"));
assert_eq!(keys[1], Bytes::from("mango"));
assert_eq!(keys[2], Bytes::from("zebra"));
}
}
mod listing_cache {
use super::*;
#[test]
fn should_add_records_for_new_keys() {
let mut cache = ListingCache::new();
let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
let mut records = Vec::new();
cache.assign_new_keys(0, &keys, &mut records);
assert_eq!(records.len(), 2);
}
#[test]
fn should_exclude_cached_keys() {
let mut cache = ListingCache::new();
let mut records1 = Vec::new();
cache.assign_new_keys(
0,
&[Bytes::from("key1"), Bytes::from("key2")],
&mut records1,
);
let mut records2 = Vec::new();
cache.assign_new_keys(
0,
&[Bytes::from("key2"), Bytes::from("key3")],
&mut records2,
);
assert_eq!(records2.len(), 1);
}
#[test]
fn should_dedupe_keys_within_batch() {
let mut cache = ListingCache::new();
let keys = vec![
Bytes::from("key1"),
Bytes::from("key2"),
Bytes::from("key1"), ];
let mut records = Vec::new();
cache.assign_new_keys(0, &keys, &mut records);
assert_eq!(records.len(), 2);
}
#[test]
fn should_include_all_keys_after_segment_change() {
let mut cache = ListingCache::new();
let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
let mut records0 = Vec::new();
cache.assign_new_keys(0, &keys, &mut records0);
let mut records1 = Vec::new();
cache.assign_new_keys(1, &keys, &mut records1);
assert_eq!(records1.len(), 2);
}
#[test]
fn should_clear_cache_when_segment_changes() {
let mut cache = ListingCache::new();
let mut records0 = Vec::new();
cache.assign_new_keys(0, &[Bytes::from("key1")], &mut records0);
let mut records1 = Vec::new();
cache.assign_new_keys(1, &[Bytes::from("key2")], &mut records1);
assert!(cache.is_new(1, &Bytes::from("key1")));
assert!(!cache.is_new(1, &Bytes::from("key2")));
}
#[test]
fn should_create_correct_listing_entry_records() {
let mut cache = ListingCache::new();
let keys = vec![Bytes::from("mykey")];
let mut records = Vec::new();
cache.assign_new_keys(42, &keys, &mut records);
assert_eq!(records.len(), 1);
let expected_key = ListingEntryKey::new(42, Bytes::from("mykey")).serialize();
let expected_value = ListingEntryValue::new().serialize();
assert_eq!(records[0].record.key, expected_key);
assert_eq!(records[0].record.value, expected_value);
assert_eq!(records[0].options, PutOptions { ttl: NoExpiry });
}
}
}