Skip to main content

log/
listing.rs

1//! Key listing for log streams.
2//!
3//! This module provides efficient key enumeration via per-segment listing records.
4//! When entries are appended, listing records track which keys are present in each
5//! segment. This enables key discovery without scanning all log entries.
6//!
7//! # Components
8//!
9//! - [`ListingCache`]: In-memory cache for deduplicating listing entries on the write path
10//! - [`LogKeyIterator`]: Iterator over keys from listing entries
11
12use std::collections::BTreeSet;
13
14use bytes::Bytes;
15use common::PutRecordOp;
16use common::Ttl::NoExpiry;
17use common::storage::PutOptions;
18
19use crate::error::Result;
20use crate::model::SegmentId;
21use crate::serde::{ListingEntryKey, ListingEntryValue};
22
23/// A key returned from the listing iterator.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct LogKey {
26    /// The user-provided key
27    pub key: Bytes,
28}
29
30impl LogKey {
31    /// Creates a new log key.
32    pub fn new(key: Bytes) -> Self {
33        Self { key }
34    }
35}
36
37/// Iterator over keys from listing entries.
38///
39/// Iterates over distinct keys present in the specified segment range.
40/// Keys are deduplicated and returned in lexicographic order.
41pub struct LogKeyIterator {
42    /// Iterator over keys.
43    keys: std::collections::btree_set::IntoIter<Bytes>,
44}
45
46impl LogKeyIterator {
47    /// Creates a key iterator from a pre-built set of keys.
48    pub(crate) fn from_keys(keys: BTreeSet<Bytes>) -> Self {
49        Self {
50            keys: keys.into_iter(),
51        }
52    }
53
54    /// Returns the next key, or `None` if exhausted.
55    pub async fn next(&mut self) -> Result<Option<LogKey>> {
56        Ok(self.keys.next().map(LogKey::new))
57    }
58}
59
60/// In-memory cache of keys seen in the current segment.
61///
62/// Used to generate listing entry records during ingestion, avoiding
63/// duplicate writes for the same key within a segment.
64pub(crate) struct ListingCache {
65    /// The segment ID this cache is tracking.
66    current_segment_id: Option<SegmentId>,
67    /// Keys seen in the current segment.
68    keys: BTreeSet<Bytes>,
69}
70
71impl ListingCache {
72    /// Creates a new empty cache.
73    pub(crate) fn new() -> Self {
74        Self {
75            current_segment_id: None,
76            keys: BTreeSet::new(),
77        }
78    }
79
80    /// Assigns listing entries for the given keys, returning the new ones.
81    ///
82    /// For each key that is new to the segment (not already cached and not
83    /// a duplicate within this batch), appends a listing entry record to
84    /// `records` and updates the cache.
85    ///
86    /// If `segment_id` differs from the cached segment, the cache is reset.
87    ///
88    /// Returns the keys that were new to this segment (i.e., produced listing
89    /// records). Callers can use this to track segment->key associations.
90    pub(crate) fn assign_new_keys(
91        &mut self,
92        segment_id: SegmentId,
93        keys: &[Bytes],
94        records: &mut Vec<PutRecordOp>,
95    ) -> Vec<Bytes> {
96        if self.current_segment_id != Some(segment_id) {
97            self.keys.clear();
98            self.current_segment_id = Some(segment_id);
99        }
100
101        let value = ListingEntryValue::new().serialize();
102        let mut new_keys = Vec::new();
103
104        for key in keys {
105            if self.keys.contains(key) {
106                continue;
107            }
108
109            let storage_key = ListingEntryKey::new(segment_id, key.clone()).serialize();
110            let record = common::Record::new(storage_key, value.clone());
111            records.push(PutRecordOp::new_with_options(
112                record,
113                PutOptions { ttl: NoExpiry },
114            ));
115            self.keys.insert(key.clone());
116            new_keys.push(key.clone());
117        }
118
119        new_keys
120    }
121
122    /// Checks if a key is new for the given segment.
123    #[cfg(test)]
124    fn is_new(&self, segment_id: SegmentId, key: &Bytes) -> bool {
125        if self.current_segment_id != Some(segment_id) {
126            return true;
127        }
128        !self.keys.contains(key)
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::storage::LogStorageRead;
136    use common::Storage;
137
138    mod log_key_iterator {
139        use opendata_macros::storage_test;
140
141        use super::*;
142
143        async fn write_listing_entry(storage: &dyn common::Storage, segment_id: u32, key: &[u8]) {
144            let storage_key =
145                ListingEntryKey::new(segment_id, Bytes::copy_from_slice(key)).serialize();
146            let value = ListingEntryValue::new().serialize();
147            storage
148                .put_with_options(
149                    vec![common::Record::new(storage_key, value).into()],
150                    common::WriteOptions::default(),
151                )
152                .await
153                .unwrap();
154        }
155
156        #[storage_test]
157        async fn should_return_empty_for_empty_range(storage: Arc<dyn Storage>) {
158            // when
159            let keys = storage.list_keys(0..0).await.unwrap();
160
161            // then
162            assert!(keys.is_empty());
163        }
164
165        #[storage_test]
166        async fn should_return_empty_when_no_listing_entries(storage: Arc<dyn Storage>) {
167            // when
168            let keys = storage.list_keys(0..10).await.unwrap();
169
170            // then
171            assert!(keys.is_empty());
172        }
173
174        #[storage_test]
175        async fn should_iterate_keys_in_single_segment(storage: Arc<dyn Storage>) {
176            // given
177            write_listing_entry(&*storage, 0, b"key-a").await;
178            write_listing_entry(&*storage, 0, b"key-b").await;
179            write_listing_entry(&*storage, 0, b"key-c").await;
180
181            // when
182            let keys: Vec<Bytes> = storage.list_keys(0..1).await.unwrap().into_iter().collect();
183
184            // then - keys returned in lexicographic order
185            assert_eq!(keys.len(), 3);
186            assert_eq!(keys[0], Bytes::from("key-a"));
187            assert_eq!(keys[1], Bytes::from("key-b"));
188            assert_eq!(keys[2], Bytes::from("key-c"));
189        }
190
191        #[storage_test]
192        async fn should_iterate_keys_across_multiple_segments(storage: Arc<dyn Storage>) {
193            // given
194            write_listing_entry(&*storage, 0, b"key-a").await;
195            write_listing_entry(&*storage, 1, b"key-b").await;
196            write_listing_entry(&*storage, 2, b"key-c").await;
197
198            // when
199            let keys = storage.list_keys(0..3).await.unwrap();
200
201            // then
202            assert_eq!(keys.len(), 3);
203        }
204
205        #[storage_test]
206        async fn should_deduplicate_keys_across_segments(storage: Arc<dyn Storage>) {
207            // given - same key in multiple segments
208            write_listing_entry(&*storage, 0, b"shared-key").await;
209            write_listing_entry(&*storage, 1, b"shared-key").await;
210            write_listing_entry(&*storage, 2, b"shared-key").await;
211
212            // when
213            let keys = storage.list_keys(0..3).await.unwrap();
214
215            // then - only one instance of the key
216            assert_eq!(keys.len(), 1);
217            assert!(keys.contains(&Bytes::from("shared-key")));
218        }
219
220        #[storage_test]
221        async fn should_respect_segment_range(storage: Arc<dyn Storage>) {
222            // given
223            write_listing_entry(&*storage, 0, b"key-0").await;
224            write_listing_entry(&*storage, 1, b"key-1").await;
225            write_listing_entry(&*storage, 2, b"key-2").await;
226            write_listing_entry(&*storage, 3, b"key-3").await;
227
228            // when - only query segments 1..3
229            let keys: Vec<Bytes> = storage.list_keys(1..3).await.unwrap().into_iter().collect();
230
231            // then - only keys from segments 1 and 2
232            assert_eq!(keys.len(), 2);
233            assert_eq!(keys[0], Bytes::from("key-1"));
234            assert_eq!(keys[1], Bytes::from("key-2"));
235        }
236
237        #[storage_test]
238        async fn should_return_keys_in_lexicographic_order(storage: Arc<dyn Storage>) {
239            // given - keys inserted out of order
240            write_listing_entry(&*storage, 0, b"zebra").await;
241            write_listing_entry(&*storage, 0, b"apple").await;
242            write_listing_entry(&*storage, 0, b"mango").await;
243
244            // when
245            let keys: Vec<Bytes> = storage.list_keys(0..1).await.unwrap().into_iter().collect();
246
247            // then - keys returned in lexicographic order
248            assert_eq!(keys[0], Bytes::from("apple"));
249            assert_eq!(keys[1], Bytes::from("mango"));
250            assert_eq!(keys[2], Bytes::from("zebra"));
251        }
252    }
253
254    mod listing_cache {
255        use super::*;
256
257        #[test]
258        fn should_add_records_for_new_keys() {
259            // given
260            let mut cache = ListingCache::new();
261            let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
262            let mut records = Vec::new();
263
264            // when
265            cache.assign_new_keys(0, &keys, &mut records);
266
267            // then
268            assert_eq!(records.len(), 2);
269        }
270
271        #[test]
272        fn should_exclude_cached_keys() {
273            // given
274            let mut cache = ListingCache::new();
275            let mut records1 = Vec::new();
276            cache.assign_new_keys(
277                0,
278                &[Bytes::from("key1"), Bytes::from("key2")],
279                &mut records1,
280            );
281
282            // when - second batch with overlap
283            let mut records2 = Vec::new();
284            cache.assign_new_keys(
285                0,
286                &[Bytes::from("key2"), Bytes::from("key3")],
287                &mut records2,
288            );
289
290            // then - only key3 is new
291            assert_eq!(records2.len(), 1);
292        }
293
294        #[test]
295        fn should_dedupe_keys_within_batch() {
296            // given
297            let mut cache = ListingCache::new();
298            let keys = vec![
299                Bytes::from("key1"),
300                Bytes::from("key2"),
301                Bytes::from("key1"), // duplicate
302            ];
303            let mut records = Vec::new();
304
305            // when
306            cache.assign_new_keys(0, &keys, &mut records);
307
308            // then
309            assert_eq!(records.len(), 2);
310        }
311
312        #[test]
313        fn should_include_all_keys_after_segment_change() {
314            // given
315            let mut cache = ListingCache::new();
316            let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
317            let mut records0 = Vec::new();
318            cache.assign_new_keys(0, &keys, &mut records0);
319
320            // when - new segment with same keys
321            let mut records1 = Vec::new();
322            cache.assign_new_keys(1, &keys, &mut records1);
323
324            // then - all keys are new in new segment
325            assert_eq!(records1.len(), 2);
326        }
327
328        #[test]
329        fn should_clear_cache_when_segment_changes() {
330            // given
331            let mut cache = ListingCache::new();
332            let mut records0 = Vec::new();
333            cache.assign_new_keys(0, &[Bytes::from("key1")], &mut records0);
334
335            // when - different segment
336            let mut records1 = Vec::new();
337            cache.assign_new_keys(1, &[Bytes::from("key2")], &mut records1);
338
339            // then - key1 should be new again (cache cleared)
340            assert!(cache.is_new(1, &Bytes::from("key1")));
341            // but key2 should not be new
342            assert!(!cache.is_new(1, &Bytes::from("key2")));
343        }
344
345        #[test]
346        fn should_create_correct_listing_entry_records() {
347            // given
348            let mut cache = ListingCache::new();
349            let keys = vec![Bytes::from("mykey")];
350            let mut records = Vec::new();
351
352            // when
353            cache.assign_new_keys(42, &keys, &mut records);
354
355            // then
356            assert_eq!(records.len(), 1);
357            let expected_key = ListingEntryKey::new(42, Bytes::from("mykey")).serialize();
358            let expected_value = ListingEntryValue::new().serialize();
359            assert_eq!(records[0].record.key, expected_key);
360            assert_eq!(records[0].record.value, expected_value);
361            assert_eq!(records[0].options, PutOptions { ttl: NoExpiry });
362        }
363    }
364}