1use std::collections::BTreeSet;
15use std::ops::Range;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use common::{Record, StorageRead};
20
21use crate::error::{Error, Result};
22use crate::model::SegmentId;
23use crate::segment::SegmentDelta;
24use crate::serde::{ListingEntryKey, ListingEntryValue};
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct LogKey {
29 pub key: Bytes,
31}
32
33impl LogKey {
34 pub fn new(key: Bytes) -> Self {
36 Self { key }
37 }
38}
39
40pub struct LogKeyIterator {
45 keys: std::collections::btree_set::IntoIter<Bytes>,
47}
48
49impl LogKeyIterator {
50 pub(crate) async fn open(
55 storage: Arc<dyn StorageRead>,
56 segment_range: Range<SegmentId>,
57 ) -> Result<Self> {
58 if segment_range.start >= segment_range.end {
60 return Ok(Self {
61 keys: BTreeSet::new().into_iter(),
62 });
63 }
64
65 let scan_range = ListingEntryKey::scan_range(segment_range);
67 let mut iter = storage
68 .scan_iter(scan_range)
69 .await
70 .map_err(|e| Error::Storage(e.to_string()))?;
71
72 let mut keys = BTreeSet::new();
74 while let Some(record) = iter
75 .next()
76 .await
77 .map_err(|e| Error::Storage(e.to_string()))?
78 {
79 let entry_key = ListingEntryKey::deserialize(&record.key)?;
80 keys.insert(entry_key.key);
81 }
82
83 Ok(Self {
84 keys: keys.into_iter(),
85 })
86 }
87
88 pub async fn next(&mut self) -> Result<Option<LogKey>> {
90 Ok(self.keys.next().map(LogKey::new))
91 }
92}
93
94#[derive(Debug, Clone)]
98pub(crate) struct ListingDelta {
99 segment_id: SegmentId,
101 new_keys: BTreeSet<Bytes>,
103}
104
105pub(crate) struct ListingCache {
124 current_segment_id: Option<SegmentId>,
126 keys: BTreeSet<Bytes>,
128}
129
130impl ListingCache {
131 pub(crate) fn new() -> Self {
133 Self {
134 current_segment_id: None,
135 keys: BTreeSet::new(),
136 }
137 }
138
139 pub(crate) fn build_delta(
149 &self,
150 seg_delta: &SegmentDelta,
151 keys: &[Bytes],
152 records: &mut Vec<Record>,
153 ) -> ListingDelta {
154 let segment_id = seg_delta.segment().id();
155 let mut new_keys = BTreeSet::new();
156 let value = ListingEntryValue::new().serialize();
157
158 for key in keys {
159 if new_keys.contains(key) || !self.is_new(segment_id, key) {
161 continue;
162 }
163
164 let storage_key = ListingEntryKey::new(segment_id, key.clone()).serialize();
166 records.push(Record::new(storage_key, value.clone()));
167 new_keys.insert(key.clone());
168 }
169
170 ListingDelta {
171 segment_id,
172 new_keys,
173 }
174 }
175
176 pub(crate) fn apply_delta(&mut self, delta: ListingDelta) {
184 if self.current_segment_id != Some(delta.segment_id) {
185 self.keys = delta.new_keys;
186 self.current_segment_id = Some(delta.segment_id);
187 } else {
188 self.keys.extend(delta.new_keys);
189 }
190 }
191
192 fn is_new(&self, segment_id: SegmentId, key: &Bytes) -> bool {
194 if self.current_segment_id != Some(segment_id) {
195 return true;
196 }
197 !self.keys.contains(key)
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::segment::LogSegment;
205 use crate::serde::SegmentMeta;
206 use crate::storage::LogStorage;
207
208 fn test_segment(id: u32) -> LogSegment {
209 LogSegment::new(id, SegmentMeta::new(0, 0))
210 }
211
212 fn test_seg_delta(segment_id: u32) -> SegmentDelta {
213 SegmentDelta::new(test_segment(segment_id), false)
214 }
215
216 mod log_key_iterator {
217 use super::*;
218
219 async fn write_listing_entry(storage: &LogStorage, segment_id: u32, key: &[u8]) {
220 let storage_key =
221 ListingEntryKey::new(segment_id, Bytes::copy_from_slice(key)).serialize();
222 let value = ListingEntryValue::new().serialize();
223 storage
224 .put_with_options(
225 vec![common::Record::new(storage_key, value)],
226 common::WriteOptions::default(),
227 )
228 .await
229 .unwrap();
230 }
231
232 #[tokio::test]
233 async fn should_return_empty_for_empty_range() {
234 let storage = LogStorage::in_memory();
236
237 let mut iter = storage.as_read().list_keys(0..0).await.unwrap();
239
240 assert!(iter.next().await.unwrap().is_none());
242 }
243
244 #[tokio::test]
245 async fn should_return_empty_when_no_listing_entries() {
246 let storage = LogStorage::in_memory();
248
249 let mut iter = storage.as_read().list_keys(0..10).await.unwrap();
251
252 assert!(iter.next().await.unwrap().is_none());
254 }
255
256 #[tokio::test]
257 async fn should_iterate_keys_in_single_segment() {
258 let storage = LogStorage::in_memory();
260 write_listing_entry(&storage, 0, b"key-a").await;
261 write_listing_entry(&storage, 0, b"key-b").await;
262 write_listing_entry(&storage, 0, b"key-c").await;
263
264 let mut iter = storage.as_read().list_keys(0..1).await.unwrap();
266
267 let mut keys = Vec::new();
269 while let Some(key) = iter.next().await.unwrap() {
270 keys.push(key.key);
271 }
272 assert_eq!(keys.len(), 3);
273 assert_eq!(keys[0], Bytes::from("key-a"));
274 assert_eq!(keys[1], Bytes::from("key-b"));
275 assert_eq!(keys[2], Bytes::from("key-c"));
276 }
277
278 #[tokio::test]
279 async fn should_iterate_keys_across_multiple_segments() {
280 let storage = LogStorage::in_memory();
282 write_listing_entry(&storage, 0, b"key-a").await;
283 write_listing_entry(&storage, 1, b"key-b").await;
284 write_listing_entry(&storage, 2, b"key-c").await;
285
286 let mut iter = storage.as_read().list_keys(0..3).await.unwrap();
288
289 let mut keys = Vec::new();
291 while let Some(key) = iter.next().await.unwrap() {
292 keys.push(key.key);
293 }
294 assert_eq!(keys.len(), 3);
295 }
296
297 #[tokio::test]
298 async fn should_deduplicate_keys_across_segments() {
299 let storage = LogStorage::in_memory();
301 write_listing_entry(&storage, 0, b"shared-key").await;
302 write_listing_entry(&storage, 1, b"shared-key").await;
303 write_listing_entry(&storage, 2, b"shared-key").await;
304
305 let mut iter = storage.as_read().list_keys(0..3).await.unwrap();
307
308 let mut keys = Vec::new();
310 while let Some(key) = iter.next().await.unwrap() {
311 keys.push(key.key);
312 }
313 assert_eq!(keys.len(), 1);
314 assert_eq!(keys[0], Bytes::from("shared-key"));
315 }
316
317 #[tokio::test]
318 async fn should_respect_segment_range() {
319 let storage = LogStorage::in_memory();
321 write_listing_entry(&storage, 0, b"key-0").await;
322 write_listing_entry(&storage, 1, b"key-1").await;
323 write_listing_entry(&storage, 2, b"key-2").await;
324 write_listing_entry(&storage, 3, b"key-3").await;
325
326 let mut iter = storage.as_read().list_keys(1..3).await.unwrap();
328
329 let mut keys = Vec::new();
331 while let Some(key) = iter.next().await.unwrap() {
332 keys.push(key.key);
333 }
334 assert_eq!(keys.len(), 2);
335 assert_eq!(keys[0], Bytes::from("key-1"));
336 assert_eq!(keys[1], Bytes::from("key-2"));
337 }
338
339 #[tokio::test]
340 async fn should_return_keys_in_lexicographic_order() {
341 let storage = LogStorage::in_memory();
343 write_listing_entry(&storage, 0, b"zebra").await;
344 write_listing_entry(&storage, 0, b"apple").await;
345 write_listing_entry(&storage, 0, b"mango").await;
346
347 let mut iter = storage.as_read().list_keys(0..1).await.unwrap();
349
350 let mut keys = Vec::new();
352 while let Some(key) = iter.next().await.unwrap() {
353 keys.push(key.key);
354 }
355 assert_eq!(keys[0], Bytes::from("apple"));
356 assert_eq!(keys[1], Bytes::from("mango"));
357 assert_eq!(keys[2], Bytes::from("zebra"));
358 }
359 }
360
361 mod listing_cache {
362 use super::*;
363
364 #[test]
365 fn should_build_delta_with_new_keys() {
366 let cache = ListingCache::new();
368 let seg_delta = test_seg_delta(0);
369 let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
370 let mut records = Vec::new();
371
372 let delta = cache.build_delta(&seg_delta, &keys, &mut records);
374
375 assert_eq!(delta.new_keys.len(), 2);
377 assert_eq!(records.len(), 2);
378 }
379
380 #[test]
381 fn should_exclude_cached_keys_from_delta() {
382 let mut cache = ListingCache::new();
384 let seg_delta = test_seg_delta(0);
385 let keys1 = vec![Bytes::from("key1"), Bytes::from("key2")];
386 let mut records1 = Vec::new();
387
388 let delta1 = cache.build_delta(&seg_delta, &keys1, &mut records1);
390 cache.apply_delta(delta1);
391
392 let keys2 = vec![Bytes::from("key2"), Bytes::from("key3")];
394 let mut records2 = Vec::new();
395 let delta2 = cache.build_delta(&seg_delta, &keys2, &mut records2);
396
397 assert_eq!(delta2.new_keys.len(), 1);
399 assert!(delta2.new_keys.contains(&Bytes::from("key3")));
400 assert_eq!(records2.len(), 1);
401 }
402
403 #[test]
404 fn should_dedupe_keys_within_batch() {
405 let cache = ListingCache::new();
407 let seg_delta = test_seg_delta(0);
408 let keys = vec![
409 Bytes::from("key1"),
410 Bytes::from("key2"),
411 Bytes::from("key1"), ];
413 let mut records = Vec::new();
414
415 let delta = cache.build_delta(&seg_delta, &keys, &mut records);
417
418 assert_eq!(delta.new_keys.len(), 2);
420 assert_eq!(records.len(), 2);
421 }
422
423 #[test]
424 fn should_include_all_keys_after_segment_change() {
425 let mut cache = ListingCache::new();
427 let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
428
429 let seg_delta0 = test_seg_delta(0);
431 let mut records0 = Vec::new();
432 let delta0 = cache.build_delta(&seg_delta0, &keys, &mut records0);
433 cache.apply_delta(delta0);
434
435 let seg_delta1 = test_seg_delta(1);
437 let mut records1 = Vec::new();
438 let delta1 = cache.build_delta(&seg_delta1, &keys, &mut records1);
439
440 assert_eq!(delta1.new_keys.len(), 2);
442 assert_eq!(records1.len(), 2);
443 }
444
445 #[test]
446 fn should_clear_cache_when_applying_delta_for_new_segment() {
447 let mut cache = ListingCache::new();
449 let seg_delta0 = test_seg_delta(0);
450 let mut records0 = Vec::new();
451 let delta0 = cache.build_delta(&seg_delta0, &[Bytes::from("key1")], &mut records0);
452 cache.apply_delta(delta0);
453
454 let seg_delta1 = test_seg_delta(1);
456 let mut records1 = Vec::new();
457 let delta1 = cache.build_delta(&seg_delta1, &[Bytes::from("key2")], &mut records1);
458 cache.apply_delta(delta1);
459
460 assert!(cache.is_new(1, &Bytes::from("key1")));
462 assert!(!cache.is_new(1, &Bytes::from("key2")));
464 }
465
466 #[test]
467 fn should_create_correct_listing_entry_records() {
468 let cache = ListingCache::new();
470 let seg_delta = test_seg_delta(42);
471 let keys = vec![Bytes::from("mykey")];
472 let mut records = Vec::new();
473
474 cache.build_delta(&seg_delta, &keys, &mut records);
476
477 assert_eq!(records.len(), 1);
479 let expected_key = ListingEntryKey::new(42, Bytes::from("mykey")).serialize();
480 let expected_value = ListingEntryValue::new().serialize();
481 assert_eq!(records[0].key, expected_key);
482 assert_eq!(records[0].value, expected_value);
483 }
484 }
485}