1use std::collections::BTreeSet;
13
14use bytes::Bytes;
15use common::PutRecordOp;
16use common::Ttl::NoExpiry;
17use common::storage::PutOptions;
18
19use crate::error::Result;
20use crate::model::SegmentId;
21use crate::serde::{ListingEntryKey, ListingEntryValue};
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct LogKey {
26 pub key: Bytes,
28}
29
30impl LogKey {
31 pub fn new(key: Bytes) -> Self {
33 Self { key }
34 }
35}
36
37pub struct LogKeyIterator {
42 keys: std::collections::btree_set::IntoIter<Bytes>,
44}
45
46impl LogKeyIterator {
47 pub(crate) fn from_keys(keys: BTreeSet<Bytes>) -> Self {
49 Self {
50 keys: keys.into_iter(),
51 }
52 }
53
54 pub async fn next(&mut self) -> Result<Option<LogKey>> {
56 Ok(self.keys.next().map(LogKey::new))
57 }
58}
59
60pub(crate) struct ListingCache {
65 current_segment_id: Option<SegmentId>,
67 keys: BTreeSet<Bytes>,
69}
70
71impl ListingCache {
72 pub(crate) fn new() -> Self {
74 Self {
75 current_segment_id: None,
76 keys: BTreeSet::new(),
77 }
78 }
79
80 pub(crate) fn assign_new_keys(
91 &mut self,
92 segment_id: SegmentId,
93 keys: &[Bytes],
94 records: &mut Vec<PutRecordOp>,
95 ) -> Vec<Bytes> {
96 if self.current_segment_id != Some(segment_id) {
97 self.keys.clear();
98 self.current_segment_id = Some(segment_id);
99 }
100
101 let value = ListingEntryValue::new().serialize();
102 let mut new_keys = Vec::new();
103
104 for key in keys {
105 if self.keys.contains(key) {
106 continue;
107 }
108
109 let storage_key = ListingEntryKey::new(segment_id, key.clone()).serialize();
110 let record = common::Record::new(storage_key, value.clone());
111 records.push(PutRecordOp::new_with_options(
112 record,
113 PutOptions { ttl: NoExpiry },
114 ));
115 self.keys.insert(key.clone());
116 new_keys.push(key.clone());
117 }
118
119 new_keys
120 }
121
122 #[cfg(test)]
124 fn is_new(&self, segment_id: SegmentId, key: &Bytes) -> bool {
125 if self.current_segment_id != Some(segment_id) {
126 return true;
127 }
128 !self.keys.contains(key)
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::storage::LogStorageRead;
136 use common::Storage;
137
138 mod log_key_iterator {
139 use opendata_macros::storage_test;
140
141 use super::*;
142
143 async fn write_listing_entry(storage: &dyn common::Storage, segment_id: u32, key: &[u8]) {
144 let storage_key =
145 ListingEntryKey::new(segment_id, Bytes::copy_from_slice(key)).serialize();
146 let value = ListingEntryValue::new().serialize();
147 storage
148 .put_with_options(
149 vec![common::Record::new(storage_key, value).into()],
150 common::WriteOptions::default(),
151 )
152 .await
153 .unwrap();
154 }
155
156 #[storage_test]
157 async fn should_return_empty_for_empty_range(storage: Arc<dyn Storage>) {
158 let keys = storage.list_keys(0..0).await.unwrap();
160
161 assert!(keys.is_empty());
163 }
164
165 #[storage_test]
166 async fn should_return_empty_when_no_listing_entries(storage: Arc<dyn Storage>) {
167 let keys = storage.list_keys(0..10).await.unwrap();
169
170 assert!(keys.is_empty());
172 }
173
174 #[storage_test]
175 async fn should_iterate_keys_in_single_segment(storage: Arc<dyn Storage>) {
176 write_listing_entry(&*storage, 0, b"key-a").await;
178 write_listing_entry(&*storage, 0, b"key-b").await;
179 write_listing_entry(&*storage, 0, b"key-c").await;
180
181 let keys: Vec<Bytes> = storage.list_keys(0..1).await.unwrap().into_iter().collect();
183
184 assert_eq!(keys.len(), 3);
186 assert_eq!(keys[0], Bytes::from("key-a"));
187 assert_eq!(keys[1], Bytes::from("key-b"));
188 assert_eq!(keys[2], Bytes::from("key-c"));
189 }
190
191 #[storage_test]
192 async fn should_iterate_keys_across_multiple_segments(storage: Arc<dyn Storage>) {
193 write_listing_entry(&*storage, 0, b"key-a").await;
195 write_listing_entry(&*storage, 1, b"key-b").await;
196 write_listing_entry(&*storage, 2, b"key-c").await;
197
198 let keys = storage.list_keys(0..3).await.unwrap();
200
201 assert_eq!(keys.len(), 3);
203 }
204
205 #[storage_test]
206 async fn should_deduplicate_keys_across_segments(storage: Arc<dyn Storage>) {
207 write_listing_entry(&*storage, 0, b"shared-key").await;
209 write_listing_entry(&*storage, 1, b"shared-key").await;
210 write_listing_entry(&*storage, 2, b"shared-key").await;
211
212 let keys = storage.list_keys(0..3).await.unwrap();
214
215 assert_eq!(keys.len(), 1);
217 assert!(keys.contains(&Bytes::from("shared-key")));
218 }
219
220 #[storage_test]
221 async fn should_respect_segment_range(storage: Arc<dyn Storage>) {
222 write_listing_entry(&*storage, 0, b"key-0").await;
224 write_listing_entry(&*storage, 1, b"key-1").await;
225 write_listing_entry(&*storage, 2, b"key-2").await;
226 write_listing_entry(&*storage, 3, b"key-3").await;
227
228 let keys: Vec<Bytes> = storage.list_keys(1..3).await.unwrap().into_iter().collect();
230
231 assert_eq!(keys.len(), 2);
233 assert_eq!(keys[0], Bytes::from("key-1"));
234 assert_eq!(keys[1], Bytes::from("key-2"));
235 }
236
237 #[storage_test]
238 async fn should_return_keys_in_lexicographic_order(storage: Arc<dyn Storage>) {
239 write_listing_entry(&*storage, 0, b"zebra").await;
241 write_listing_entry(&*storage, 0, b"apple").await;
242 write_listing_entry(&*storage, 0, b"mango").await;
243
244 let keys: Vec<Bytes> = storage.list_keys(0..1).await.unwrap().into_iter().collect();
246
247 assert_eq!(keys[0], Bytes::from("apple"));
249 assert_eq!(keys[1], Bytes::from("mango"));
250 assert_eq!(keys[2], Bytes::from("zebra"));
251 }
252 }
253
254 mod listing_cache {
255 use super::*;
256
257 #[test]
258 fn should_add_records_for_new_keys() {
259 let mut cache = ListingCache::new();
261 let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
262 let mut records = Vec::new();
263
264 cache.assign_new_keys(0, &keys, &mut records);
266
267 assert_eq!(records.len(), 2);
269 }
270
271 #[test]
272 fn should_exclude_cached_keys() {
273 let mut cache = ListingCache::new();
275 let mut records1 = Vec::new();
276 cache.assign_new_keys(
277 0,
278 &[Bytes::from("key1"), Bytes::from("key2")],
279 &mut records1,
280 );
281
282 let mut records2 = Vec::new();
284 cache.assign_new_keys(
285 0,
286 &[Bytes::from("key2"), Bytes::from("key3")],
287 &mut records2,
288 );
289
290 assert_eq!(records2.len(), 1);
292 }
293
294 #[test]
295 fn should_dedupe_keys_within_batch() {
296 let mut cache = ListingCache::new();
298 let keys = vec![
299 Bytes::from("key1"),
300 Bytes::from("key2"),
301 Bytes::from("key1"), ];
303 let mut records = Vec::new();
304
305 cache.assign_new_keys(0, &keys, &mut records);
307
308 assert_eq!(records.len(), 2);
310 }
311
312 #[test]
313 fn should_include_all_keys_after_segment_change() {
314 let mut cache = ListingCache::new();
316 let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
317 let mut records0 = Vec::new();
318 cache.assign_new_keys(0, &keys, &mut records0);
319
320 let mut records1 = Vec::new();
322 cache.assign_new_keys(1, &keys, &mut records1);
323
324 assert_eq!(records1.len(), 2);
326 }
327
328 #[test]
329 fn should_clear_cache_when_segment_changes() {
330 let mut cache = ListingCache::new();
332 let mut records0 = Vec::new();
333 cache.assign_new_keys(0, &[Bytes::from("key1")], &mut records0);
334
335 let mut records1 = Vec::new();
337 cache.assign_new_keys(1, &[Bytes::from("key2")], &mut records1);
338
339 assert!(cache.is_new(1, &Bytes::from("key1")));
341 assert!(!cache.is_new(1, &Bytes::from("key2")));
343 }
344
345 #[test]
346 fn should_create_correct_listing_entry_records() {
347 let mut cache = ListingCache::new();
349 let keys = vec![Bytes::from("mykey")];
350 let mut records = Vec::new();
351
352 cache.assign_new_keys(42, &keys, &mut records);
354
355 assert_eq!(records.len(), 1);
357 let expected_key = ListingEntryKey::new(42, Bytes::from("mykey")).serialize();
358 let expected_value = ListingEntryValue::new().serialize();
359 assert_eq!(records[0].record.key, expected_key);
360 assert_eq!(records[0].record.value, expected_value);
361 assert_eq!(records[0].options, PutOptions { ttl: NoExpiry });
362 }
363 }
364}