Skip to main content

log/
listing.rs

1//! Key listing for log streams.
2//!
3//! This module provides efficient key enumeration via per-segment listing records.
4//! When entries are appended, listing records track which keys are present in each
5//! segment. This enables key discovery without scanning all log entries.
6//!
7//! # Components
8//!
9//! - [`ListingReader`]: Read-only access to listing entries
10//! - [`ListingCache`]: In-memory cache for write-path delta building
11//! - [`ListingDelta`]: Delta representing listing state changes
12//! - [`LogKeyIterator`]: Iterator over keys from listing entries
13
14use std::collections::BTreeSet;
15use std::ops::Range;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use common::{Record, StorageRead};
20
21use crate::error::{Error, Result};
22use crate::model::SegmentId;
23use crate::segment::SegmentDelta;
24use crate::serde::{ListingEntryKey, ListingEntryValue};
25
26/// A key returned from the listing iterator.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct LogKey {
29    /// The user-provided key
30    pub key: Bytes,
31}
32
33impl LogKey {
34    /// Creates a new log key.
35    pub fn new(key: Bytes) -> Self {
36        Self { key }
37    }
38}
39
40/// Iterator over keys from listing entries.
41///
42/// Iterates over distinct keys present in the specified segment range.
43/// Keys are deduplicated and returned in lexicographic order.
44pub struct LogKeyIterator {
45    /// Iterator over keys.
46    keys: std::collections::btree_set::IntoIter<Bytes>,
47}
48
49impl LogKeyIterator {
50    /// Creates a new key iterator by scanning listing entries.
51    ///
52    /// This loads all keys from the segment range into memory for deduplication.
53    /// The async API is preserved for future streaming implementations.
54    pub(crate) async fn open(
55        storage: Arc<dyn StorageRead>,
56        segment_range: Range<SegmentId>,
57    ) -> Result<Self> {
58        // Empty range means no keys
59        if segment_range.start >= segment_range.end {
60            return Ok(Self {
61                keys: BTreeSet::new().into_iter(),
62            });
63        }
64
65        // Scan all listing entries in the segment range
66        let scan_range = ListingEntryKey::scan_range(segment_range);
67        let mut iter = storage
68            .scan_iter(scan_range)
69            .await
70            .map_err(|e| Error::Storage(e.to_string()))?;
71
72        // Collect keys into BTreeSet (deduplicates and sorts)
73        let mut keys = BTreeSet::new();
74        while let Some(record) = iter
75            .next()
76            .await
77            .map_err(|e| Error::Storage(e.to_string()))?
78        {
79            let entry_key = ListingEntryKey::deserialize(&record.key)?;
80            keys.insert(entry_key.key);
81        }
82
83        Ok(Self {
84            keys: keys.into_iter(),
85        })
86    }
87
88    /// Returns the next key, or `None` if exhausted.
89    pub async fn next(&mut self) -> Result<Option<LogKey>> {
90        Ok(self.keys.next().map(LogKey::new))
91    }
92}
93
94/// Delta representing listing state changes.
95///
96/// Produced by [`ListingCache::build_delta`] and consumed by [`ListingCache::apply_delta`].
97#[derive(Debug, Clone)]
98pub(crate) struct ListingDelta {
99    /// The segment ID these entries belong to.
100    segment_id: SegmentId,
101    /// Keys that are new to this segment.
102    new_keys: BTreeSet<Bytes>,
103}
104
105/// In-memory cache of keys seen in the current segment.
106///
107/// Used to build deltas for listing entries during ingestion, avoiding
108/// duplicate writes for the same key within a segment.
109///
110/// # Usage
111///
112/// ```ignore
113/// let mut records = Vec::new();
114///
115/// // Build delta and add listing records
116/// let listing_delta = listing_cache.build_delta(&seg_delta, &keys, &mut records);
117///
118/// // ... write records to storage ...
119///
120/// // Apply delta to cache
121/// listing_cache.apply_delta(listing_delta);
122/// ```
123pub(crate) struct ListingCache {
124    /// The segment ID this cache is tracking.
125    current_segment_id: Option<SegmentId>,
126    /// Keys seen in the current segment.
127    keys: BTreeSet<Bytes>,
128}
129
130impl ListingCache {
131    /// Creates a new empty cache.
132    pub(crate) fn new() -> Self {
133        Self {
134            current_segment_id: None,
135            keys: BTreeSet::new(),
136        }
137    }
138
139    /// Builds a delta for the given keys.
140    ///
141    /// For each key that is new to the segment (not in cache and not seen
142    /// earlier in this batch), adds:
143    /// - The key to the returned delta
144    /// - A listing entry record to the records vec
145    ///
146    /// Does NOT update the cache - call `apply_delta()` after the storage
147    /// write succeeds.
148    pub(crate) fn build_delta(
149        &self,
150        seg_delta: &SegmentDelta,
151        keys: &[Bytes],
152        records: &mut Vec<Record>,
153    ) -> ListingDelta {
154        let segment_id = seg_delta.segment().id();
155        let mut new_keys = BTreeSet::new();
156        let value = ListingEntryValue::new().serialize();
157
158        for key in keys {
159            // Skip if already seen in this batch or cached
160            if new_keys.contains(key) || !self.is_new(segment_id, key) {
161                continue;
162            }
163
164            // Add listing entry record
165            let storage_key = ListingEntryKey::new(segment_id, key.clone()).serialize();
166            records.push(Record::new(storage_key, value.clone()));
167            new_keys.insert(key.clone());
168        }
169
170        ListingDelta {
171            segment_id,
172            new_keys,
173        }
174    }
175
176    /// Applies a delta to the cache, recording all new keys as seen.
177    ///
178    /// Call this after the storage write succeeds to update the cache
179    /// with the newly written keys.
180    ///
181    /// If the delta's segment differs from the cached segment, the cache
182    /// is reset before applying.
183    pub(crate) fn apply_delta(&mut self, delta: ListingDelta) {
184        if self.current_segment_id != Some(delta.segment_id) {
185            self.keys = delta.new_keys;
186            self.current_segment_id = Some(delta.segment_id);
187        } else {
188            self.keys.extend(delta.new_keys);
189        }
190    }
191
192    /// Checks if a key is new for the given segment.
193    fn is_new(&self, segment_id: SegmentId, key: &Bytes) -> bool {
194        if self.current_segment_id != Some(segment_id) {
195            return true;
196        }
197        !self.keys.contains(key)
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use crate::segment::LogSegment;
205    use crate::serde::SegmentMeta;
206    use crate::storage::LogStorage;
207
208    fn test_segment(id: u32) -> LogSegment {
209        LogSegment::new(id, SegmentMeta::new(0, 0))
210    }
211
212    fn test_seg_delta(segment_id: u32) -> SegmentDelta {
213        SegmentDelta::new(test_segment(segment_id), false)
214    }
215
216    mod log_key_iterator {
217        use super::*;
218
219        async fn write_listing_entry(storage: &LogStorage, segment_id: u32, key: &[u8]) {
220            let storage_key =
221                ListingEntryKey::new(segment_id, Bytes::copy_from_slice(key)).serialize();
222            let value = ListingEntryValue::new().serialize();
223            storage
224                .put_with_options(
225                    vec![common::Record::new(storage_key, value)],
226                    common::WriteOptions::default(),
227                )
228                .await
229                .unwrap();
230        }
231
232        #[tokio::test]
233        async fn should_return_empty_for_empty_range() {
234            // given
235            let storage = LogStorage::in_memory();
236
237            // when
238            let mut iter = storage.as_read().list_keys(0..0).await.unwrap();
239
240            // then
241            assert!(iter.next().await.unwrap().is_none());
242        }
243
244        #[tokio::test]
245        async fn should_return_empty_when_no_listing_entries() {
246            // given
247            let storage = LogStorage::in_memory();
248
249            // when
250            let mut iter = storage.as_read().list_keys(0..10).await.unwrap();
251
252            // then
253            assert!(iter.next().await.unwrap().is_none());
254        }
255
256        #[tokio::test]
257        async fn should_iterate_keys_in_single_segment() {
258            // given
259            let storage = LogStorage::in_memory();
260            write_listing_entry(&storage, 0, b"key-a").await;
261            write_listing_entry(&storage, 0, b"key-b").await;
262            write_listing_entry(&storage, 0, b"key-c").await;
263
264            // when
265            let mut iter = storage.as_read().list_keys(0..1).await.unwrap();
266
267            // then - keys returned in lexicographic order
268            let mut keys = Vec::new();
269            while let Some(key) = iter.next().await.unwrap() {
270                keys.push(key.key);
271            }
272            assert_eq!(keys.len(), 3);
273            assert_eq!(keys[0], Bytes::from("key-a"));
274            assert_eq!(keys[1], Bytes::from("key-b"));
275            assert_eq!(keys[2], Bytes::from("key-c"));
276        }
277
278        #[tokio::test]
279        async fn should_iterate_keys_across_multiple_segments() {
280            // given
281            let storage = LogStorage::in_memory();
282            write_listing_entry(&storage, 0, b"key-a").await;
283            write_listing_entry(&storage, 1, b"key-b").await;
284            write_listing_entry(&storage, 2, b"key-c").await;
285
286            // when
287            let mut iter = storage.as_read().list_keys(0..3).await.unwrap();
288
289            // then
290            let mut keys = Vec::new();
291            while let Some(key) = iter.next().await.unwrap() {
292                keys.push(key.key);
293            }
294            assert_eq!(keys.len(), 3);
295        }
296
297        #[tokio::test]
298        async fn should_deduplicate_keys_across_segments() {
299            // given - same key in multiple segments
300            let storage = LogStorage::in_memory();
301            write_listing_entry(&storage, 0, b"shared-key").await;
302            write_listing_entry(&storage, 1, b"shared-key").await;
303            write_listing_entry(&storage, 2, b"shared-key").await;
304
305            // when
306            let mut iter = storage.as_read().list_keys(0..3).await.unwrap();
307
308            // then - only one instance of the key
309            let mut keys = Vec::new();
310            while let Some(key) = iter.next().await.unwrap() {
311                keys.push(key.key);
312            }
313            assert_eq!(keys.len(), 1);
314            assert_eq!(keys[0], Bytes::from("shared-key"));
315        }
316
317        #[tokio::test]
318        async fn should_respect_segment_range() {
319            // given
320            let storage = LogStorage::in_memory();
321            write_listing_entry(&storage, 0, b"key-0").await;
322            write_listing_entry(&storage, 1, b"key-1").await;
323            write_listing_entry(&storage, 2, b"key-2").await;
324            write_listing_entry(&storage, 3, b"key-3").await;
325
326            // when - only query segments 1..3
327            let mut iter = storage.as_read().list_keys(1..3).await.unwrap();
328
329            // then - only keys from segments 1 and 2
330            let mut keys = Vec::new();
331            while let Some(key) = iter.next().await.unwrap() {
332                keys.push(key.key);
333            }
334            assert_eq!(keys.len(), 2);
335            assert_eq!(keys[0], Bytes::from("key-1"));
336            assert_eq!(keys[1], Bytes::from("key-2"));
337        }
338
339        #[tokio::test]
340        async fn should_return_keys_in_lexicographic_order() {
341            // given - keys inserted out of order
342            let storage = LogStorage::in_memory();
343            write_listing_entry(&storage, 0, b"zebra").await;
344            write_listing_entry(&storage, 0, b"apple").await;
345            write_listing_entry(&storage, 0, b"mango").await;
346
347            // when
348            let mut iter = storage.as_read().list_keys(0..1).await.unwrap();
349
350            // then - keys returned in lexicographic order
351            let mut keys = Vec::new();
352            while let Some(key) = iter.next().await.unwrap() {
353                keys.push(key.key);
354            }
355            assert_eq!(keys[0], Bytes::from("apple"));
356            assert_eq!(keys[1], Bytes::from("mango"));
357            assert_eq!(keys[2], Bytes::from("zebra"));
358        }
359    }
360
361    mod listing_cache {
362        use super::*;
363
364        #[test]
365        fn should_build_delta_with_new_keys() {
366            // given
367            let cache = ListingCache::new();
368            let seg_delta = test_seg_delta(0);
369            let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
370            let mut records = Vec::new();
371
372            // when
373            let delta = cache.build_delta(&seg_delta, &keys, &mut records);
374
375            // then
376            assert_eq!(delta.new_keys.len(), 2);
377            assert_eq!(records.len(), 2);
378        }
379
380        #[test]
381        fn should_exclude_cached_keys_from_delta() {
382            // given
383            let mut cache = ListingCache::new();
384            let seg_delta = test_seg_delta(0);
385            let keys1 = vec![Bytes::from("key1"), Bytes::from("key2")];
386            let mut records1 = Vec::new();
387
388            // First batch
389            let delta1 = cache.build_delta(&seg_delta, &keys1, &mut records1);
390            cache.apply_delta(delta1);
391
392            // when - second batch with overlap
393            let keys2 = vec![Bytes::from("key2"), Bytes::from("key3")];
394            let mut records2 = Vec::new();
395            let delta2 = cache.build_delta(&seg_delta, &keys2, &mut records2);
396
397            // then - only key3 is new
398            assert_eq!(delta2.new_keys.len(), 1);
399            assert!(delta2.new_keys.contains(&Bytes::from("key3")));
400            assert_eq!(records2.len(), 1);
401        }
402
403        #[test]
404        fn should_dedupe_keys_within_batch() {
405            // given
406            let cache = ListingCache::new();
407            let seg_delta = test_seg_delta(0);
408            let keys = vec![
409                Bytes::from("key1"),
410                Bytes::from("key2"),
411                Bytes::from("key1"), // duplicate
412            ];
413            let mut records = Vec::new();
414
415            // when
416            let delta = cache.build_delta(&seg_delta, &keys, &mut records);
417
418            // then
419            assert_eq!(delta.new_keys.len(), 2);
420            assert_eq!(records.len(), 2);
421        }
422
423        #[test]
424        fn should_include_all_keys_after_segment_change() {
425            // given
426            let mut cache = ListingCache::new();
427            let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
428
429            // First segment
430            let seg_delta0 = test_seg_delta(0);
431            let mut records0 = Vec::new();
432            let delta0 = cache.build_delta(&seg_delta0, &keys, &mut records0);
433            cache.apply_delta(delta0);
434
435            // when - new segment with same keys
436            let seg_delta1 = test_seg_delta(1);
437            let mut records1 = Vec::new();
438            let delta1 = cache.build_delta(&seg_delta1, &keys, &mut records1);
439
440            // then - all keys are new in new segment
441            assert_eq!(delta1.new_keys.len(), 2);
442            assert_eq!(records1.len(), 2);
443        }
444
445        #[test]
446        fn should_clear_cache_when_applying_delta_for_new_segment() {
447            // given
448            let mut cache = ListingCache::new();
449            let seg_delta0 = test_seg_delta(0);
450            let mut records0 = Vec::new();
451            let delta0 = cache.build_delta(&seg_delta0, &[Bytes::from("key1")], &mut records0);
452            cache.apply_delta(delta0);
453
454            // when - apply delta for different segment
455            let seg_delta1 = test_seg_delta(1);
456            let mut records1 = Vec::new();
457            let delta1 = cache.build_delta(&seg_delta1, &[Bytes::from("key2")], &mut records1);
458            cache.apply_delta(delta1);
459
460            // then - key1 should be new again (cache cleared)
461            assert!(cache.is_new(1, &Bytes::from("key1")));
462            // but key2 should not be new
463            assert!(!cache.is_new(1, &Bytes::from("key2")));
464        }
465
466        #[test]
467        fn should_create_correct_listing_entry_records() {
468            // given
469            let cache = ListingCache::new();
470            let seg_delta = test_seg_delta(42);
471            let keys = vec![Bytes::from("mykey")];
472            let mut records = Vec::new();
473
474            // when
475            cache.build_delta(&seg_delta, &keys, &mut records);
476
477            // then
478            assert_eq!(records.len(), 1);
479            let expected_key = ListingEntryKey::new(42, Bytes::from("mykey")).serialize();
480            let expected_value = ListingEntryValue::new().serialize();
481            assert_eq!(records[0].key, expected_key);
482            assert_eq!(records[0].value, expected_value);
483        }
484    }
485}