Skip to main content

commonware_utils/bitmap/roaring/
prunable.rs

1//! Container-aligned prunable wrapper around [`Bitmap`].
2//!
3//! [`Prunable`] adds a "pruned-below" watermark to a [`Bitmap`]. Values strictly less
4//! than the watermark are guaranteed not to exist in the bitmap, and querying or inserting
5//! such values is treated as a programming bug and panics.
6//!
7//! # Granularity
8//!
9//! Pruning is **container-aligned**: thresholds are rounded down to the nearest multiple of
10//! 65536 (the size of a roaring container). [`Prunable::prune`] returns the threshold
11//! actually applied, which the caller can inspect via [`Prunable::pruned_below`].
12//!
13//! For example, `prune(70_000)` will:
14//!
15//! - Drop the container at key `0` (covering values `0..65536`).
16//! - Keep the container at key `1` (covering values `65536..131072`) entirely, including
17//!   any values in the range `65536..70000` that the caller asked to prune.
18//! - Set `pruned_below()` to `65536`.
19//!
20//! In other words, some values "below" the requested threshold may linger in the boundary
21//! container. Callers wanting strict semantics can pass a container-aligned threshold
22//! (e.g. `threshold & !0xFFFF` or rounded up to the next boundary).
23//!
24//! # Invariant
25//!
26//! After construction or any operation, the underlying bitmap contains no values
27//! `< pruned_below`. The [`Read`] impl validates this on decode.
28
29use super::Bitmap;
30use bytes::{Buf, BufMut};
31use commonware_codec::{EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt, Write};
32use core::ops::Range;
33
34/// Number of values per container. Pruning aligns to multiples of this value.
35const CONTAINER_SIZE: u64 = 1 << 16;
36
37/// Mask for extracting the within-container index from a value.
38const CONTAINER_MASK: u64 = CONTAINER_SIZE - 1;
39
40/// A [`Bitmap`] paired with a "pruned-below" watermark.
41///
42/// See the module-level documentation for semantics and granularity.
43#[derive(Clone, Debug, Default, PartialEq, Eq)]
44pub struct Prunable {
45    /// The underlying bitmap. Invariant: contains no values `< pruned_below`.
46    bitmap: Bitmap,
47
48    /// The lowest value that may exist in the bitmap. Always a multiple of 65536.
49    pruned_below: u64,
50}
51
52impl Prunable {
53    /// Creates an empty prunable bitmap with no pruning applied.
54    pub const fn new() -> Self {
55        Self {
56            bitmap: Bitmap::new(),
57            pruned_below: 0,
58        }
59    }
60
61    /// Returns the cardinality (number of values currently in the set).
62    ///
63    /// Pruning subtracts from this: dropping a container with N values reduces `len()` by N.
64    /// This differs from [`super::super::Prunable::len`] (the dense `BitMap<N>` wrapper),
65    /// which preserves length across pruning because positions are meaningful in a sequential
66    /// bit array. Here, only present values are counted.
67    pub fn len(&self) -> u64 {
68        self.bitmap.len()
69    }
70
71    /// Returns whether the bitmap is empty.
72    pub fn is_empty(&self) -> bool {
73        self.bitmap.is_empty()
74    }
75
76    /// Returns the current pruning watermark.
77    ///
78    /// Always a multiple of 65536. No value `< pruned_below` exists in the bitmap.
79    pub const fn pruned_below(&self) -> u64 {
80        self.pruned_below
81    }
82
83    /// Inserts a value into the bitmap. Returns `true` if newly inserted.
84    ///
85    /// # Panics
86    ///
87    /// Panics if `value < pruned_below()`.
88    pub fn insert(&mut self, value: u64) -> bool {
89        assert!(
90            value >= self.pruned_below,
91            "value pruned: {value} < pruned_below {}",
92            self.pruned_below
93        );
94        self.bitmap.insert(value)
95    }
96
97    /// Inserts a range of values into the bitmap. Returns the number of values newly inserted.
98    ///
99    /// # Panics
100    ///
101    /// Panics if `start < pruned_below()`.
102    pub fn insert_range(&mut self, range: Range<u64>) -> u64 {
103        let Range { start, end } = range;
104        if start >= end {
105            return 0;
106        }
107        assert!(
108            start >= self.pruned_below,
109            "start pruned: {start} < pruned_below {}",
110            self.pruned_below
111        );
112        self.bitmap.insert_range(start..end)
113    }
114
115    /// Checks if the bitmap contains the given value.
116    ///
117    /// # Panics
118    ///
119    /// Panics if `value < pruned_below()`.
120    pub fn contains(&self, value: u64) -> bool {
121        assert!(
122            value >= self.pruned_below,
123            "value pruned: {value} < pruned_below {}",
124            self.pruned_below
125        );
126        self.bitmap.contains(value)
127    }
128
129    /// Prunes all values strictly less than `threshold`, rounding down to the nearest
130    /// container boundary (multiple of 65536).
131    ///
132    /// Returns the threshold actually applied (the rounded-down value), which is also the
133    /// new value of [`Self::pruned_below`].
134    ///
135    /// No-op if the rounded threshold is `<= pruned_below()`.
136    pub fn prune(&mut self, threshold: u64) -> u64 {
137        let aligned = threshold & !CONTAINER_MASK;
138        if aligned <= self.pruned_below {
139            return self.pruned_below;
140        }
141        let target_key = aligned >> 16;
142        self.bitmap.truncate_containers_below(target_key);
143        self.pruned_below = aligned;
144        aligned
145    }
146
147    /// Returns the minimum value in the bitmap, if any.
148    pub fn min(&self) -> Option<u64> {
149        self.bitmap.min()
150    }
151
152    /// Returns the maximum value in the bitmap, if any.
153    pub fn max(&self) -> Option<u64> {
154        self.bitmap.max()
155    }
156
157    /// Returns an iterator over the values in sorted order.
158    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
159        self.bitmap.iter()
160    }
161
162    /// Returns an iterator over the values in the range in sorted order.
163    ///
164    /// # Panics
165    ///
166    /// Panics if `start < pruned_below()`.
167    pub fn iter_range(&self, range: Range<u64>) -> impl Iterator<Item = u64> + '_ {
168        let Range { start, end } = range;
169        if start >= end {
170            return self.bitmap.iter_range(0..0);
171        }
172        assert!(
173            start >= self.pruned_below,
174            "start pruned: {start} < pruned_below {}",
175            self.pruned_below
176        );
177        self.bitmap.iter_range(start..end)
178    }
179
180    /// Clears all values from the bitmap. Does not reset [`Self::pruned_below`].
181    pub fn clear(&mut self) {
182        self.bitmap.clear();
183    }
184}
185
186impl Extend<u64> for Prunable {
187    /// Inserts each value from the iterator. Panics if any value is below
188    /// [`Self::pruned_below`], matching [`Self::insert`].
189    fn extend<I: IntoIterator<Item = u64>>(&mut self, iter: I) {
190        for value in iter {
191            self.insert(value);
192        }
193    }
194}
195
196impl FromIterator<u64> for Prunable {
197    fn from_iter<I: IntoIterator<Item = u64>>(iter: I) -> Self {
198        let mut p = Self::new();
199        p.extend(iter);
200        p
201    }
202}
203
204impl Write for Prunable {
205    fn write(&self, buf: &mut impl BufMut) {
206        self.pruned_below.write(buf);
207        self.bitmap.write(buf);
208    }
209}
210
211impl EncodeSize for Prunable {
212    fn encode_size(&self) -> usize {
213        self.pruned_below.encode_size() + self.bitmap.encode_size()
214    }
215}
216
217impl Read for Prunable {
218    /// Configuration for decoding: range limit on number of containers (passed through to
219    /// the underlying [`Bitmap`]).
220    type Cfg = RangeCfg<usize>;
221
222    fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
223        let pruned_below = u64::read(buf)?;
224        if pruned_below & CONTAINER_MASK != 0 {
225            return Err(CodecError::Invalid(
226                "Prunable",
227                "pruned_below must be a multiple of 65536",
228            ));
229        }
230
231        // Validate invariant: no container key < pruned_below >> 16.
232        let bitmap = Bitmap::read_cfg(buf, cfg)?;
233        let target_key = pruned_below >> 16;
234        if let Some((&first_key, _)) = bitmap.containers.first_key_value() {
235            if first_key < target_key {
236                return Err(CodecError::Invalid(
237                    "Prunable",
238                    "container key below pruned_below",
239                ));
240            }
241        }
242        Ok(Self {
243            bitmap,
244            pruned_below,
245        })
246    }
247}
248
249#[cfg(feature = "arbitrary")]
250impl arbitrary::Arbitrary<'_> for Prunable {
251    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
252        let mut p = Self {
253            bitmap: Bitmap::arbitrary(u)?,
254            pruned_below: 0,
255        };
256
257        // Pick an arbitrary threshold within the bitmap's value range and apply it.
258        // `prune` itself enforces the invariant.
259        let max = p.bitmap.max().unwrap_or(0);
260        let threshold = u.int_in_range(0..=max)?;
261        p.prune(threshold);
262        Ok(p)
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use commonware_codec::{Decode, Encode};
270
271    #[test]
272    fn test_new_and_empty() {
273        let p = Prunable::new();
274        assert!(p.is_empty());
275        assert_eq!(p.len(), 0);
276        assert_eq!(p.pruned_below(), 0);
277    }
278
279    #[test]
280    fn test_insert_and_contains() {
281        let mut p = Prunable::new();
282        assert!(p.insert(42));
283        assert!(p.insert(1_000_000));
284        assert!(!p.insert(42));
285        assert!(p.contains(42));
286        assert!(p.contains(1_000_000));
287        assert!(!p.contains(43));
288        assert_eq!(p.len(), 2);
289    }
290
291    #[test]
292    fn test_prune_drops_containers() {
293        let mut p = Prunable::new();
294        p.insert(100); // container 0
295        p.insert(70_000); // container 1
296        p.insert(140_000); // container 2
297        p.insert(330_000); // container 5
298        assert_eq!(p.len(), 4);
299
300        // Prune at the start of container 2 (boundary = 2 * 65536 = 131_072).
301        let returned = p.prune(131_072);
302        assert_eq!(returned, 131_072);
303        assert_eq!(p.pruned_below(), 131_072);
304        assert_eq!(p.len(), 2);
305        assert!(p.contains(140_000));
306        assert!(p.contains(330_000));
307    }
308
309    #[test]
310    fn test_prune_rounds_down() {
311        let mut p = Prunable::new();
312        p.insert(100); // container 0
313        p.insert(70_000); // container 1
314
315        // Threshold lies inside container 1 (boundary = 65_536).
316        // Container 0 dropped; container 1 (with 70_000) preserved.
317        let returned = p.prune(70_000);
318        assert_eq!(returned, 65_536);
319        assert_eq!(p.pruned_below(), 65_536);
320        assert_eq!(p.len(), 1);
321        assert!(p.contains(70_000));
322    }
323
324    #[test]
325    fn test_prune_partial_container_lingers() {
326        let mut p = Prunable::new();
327        p.insert(65_500); // container 0, near top
328        p.insert(70_000); // container 1, near bottom
329
330        // prune(70_000) rounds down to 65_536, drops container 0 only.
331        p.prune(70_000);
332        // 65_500 (was in dropped container 0) is gone.
333        // We can't call contains(65_500) directly (it'd panic), so check the inner bitmap.
334        assert!(!p.bitmap.contains(65_500));
335        // 70_000 is still in container 1.
336        assert!(p.contains(70_000));
337    }
338
339    #[test]
340    fn test_prune_idempotent_below_watermark() {
341        let mut p = Prunable::new();
342        p.insert(70_000);
343        p.prune(65_536);
344        assert_eq!(p.pruned_below(), 65_536);
345        // Calling with a smaller threshold returns the existing watermark.
346        let returned = p.prune(0);
347        assert_eq!(returned, 65_536);
348        assert_eq!(p.pruned_below(), 65_536);
349    }
350
351    #[test]
352    fn test_prune_then_insert_above() {
353        let mut p = Prunable::new();
354        p.prune(131_072);
355        p.insert(200_000);
356        assert!(p.contains(200_000));
357        assert_eq!(p.len(), 1);
358    }
359
360    #[test]
361    fn test_prune_zero_no_op() {
362        let mut p = Prunable::new();
363        p.insert(100);
364        let returned = p.prune(0);
365        assert_eq!(returned, 0);
366        assert_eq!(p.pruned_below(), 0);
367        assert_eq!(p.len(), 1);
368    }
369
370    #[test]
371    #[should_panic(expected = "value pruned")]
372    fn test_panic_on_insert_below_pruned() {
373        let mut p = Prunable::new();
374        p.prune(131_072);
375        p.insert(50);
376    }
377
378    #[test]
379    #[should_panic(expected = "value pruned")]
380    fn test_panic_on_contains_below_pruned() {
381        let mut p = Prunable::new();
382        p.prune(131_072);
383        let _ = p.contains(50);
384    }
385
386    #[test]
387    #[should_panic(expected = "start pruned")]
388    fn test_panic_on_insert_range_below_pruned() {
389        let mut p = Prunable::new();
390        p.prune(131_072);
391        p.insert_range(50..100);
392    }
393
394    #[test]
395    #[should_panic(expected = "start pruned")]
396    fn test_panic_on_iter_range_below_pruned() {
397        let mut p = Prunable::new();
398        p.prune(131_072);
399        let _ = p.iter_range(50..200_000);
400    }
401
402    #[test]
403    fn test_empty_insert_ranges_below_pruned_are_no_ops() {
404        let mut p = Prunable::new();
405        p.insert(200_000);
406        p.prune(131_072);
407
408        assert_eq!(p.insert_range(50..50), 0);
409        let start = 50u64;
410        let end = 10u64;
411        assert_eq!(p.insert_range(start..end), 0);
412        assert_eq!(p.len(), 1);
413    }
414
415    #[test]
416    fn test_empty_iter_ranges_below_pruned_are_empty() {
417        let mut p = Prunable::new();
418        p.insert(200_000);
419        p.prune(131_072);
420
421        assert!(p.iter_range(50..50).next().is_none());
422        let start = 50u64;
423        let end = 10u64;
424        assert!(p.iter_range(start..end).next().is_none());
425    }
426
427    #[test]
428    fn test_iter() {
429        let mut p = Prunable::new();
430        for v in [200_000u64, 100, 70_000, 5] {
431            p.insert(v);
432        }
433        p.prune(131_072);
434        let collected: Vec<u64> = p.iter().collect();
435        assert_eq!(collected, vec![200_000]);
436    }
437
438    #[test]
439    fn test_clear_preserves_pruned_below() {
440        let mut p = Prunable::new();
441        p.insert(200_000);
442        p.prune(131_072);
443        p.clear();
444        assert!(p.is_empty());
445        assert_eq!(p.pruned_below(), 131_072);
446    }
447
448    #[test]
449    fn test_from_iter_basic() {
450        let p: Prunable = [5u64, 10, 100, 65_537].into_iter().collect();
451        assert_eq!(p.len(), 4);
452        assert_eq!(p.pruned_below(), 0);
453        assert!(p.contains(5));
454        assert!(p.contains(65_537));
455    }
456
457    #[test]
458    fn test_from_iter_empty() {
459        let p: Prunable = core::iter::empty::<u64>().collect();
460        assert!(p.is_empty());
461        assert_eq!(p.pruned_below(), 0);
462    }
463
464    #[test]
465    fn test_extend_after_pruning() {
466        let mut p = Prunable::new();
467        p.insert(200_000);
468        p.prune(131_072);
469
470        // All values >= pruned_below, so extend works.
471        p.extend([200_001u64, 300_000]);
472        assert_eq!(p.len(), 3);
473        assert!(p.contains(200_001));
474    }
475
476    #[test]
477    #[should_panic(expected = "value pruned")]
478    fn test_extend_below_pruned_panics() {
479        let mut p = Prunable::new();
480        p.prune(131_072);
481        // 50 is below pruned_below; should panic via insert.
482        p.extend([200_000u64, 50]);
483    }
484
485    #[test]
486    fn test_codec_roundtrip_empty() {
487        let p = Prunable::new();
488        let encoded = p.encode();
489        let decoded = Prunable::decode_cfg(encoded, &(..).into()).unwrap();
490        assert_eq!(p, decoded);
491    }
492
493    #[test]
494    fn test_codec_roundtrip_with_pruning() {
495        let mut p = Prunable::new();
496        p.insert(200_000);
497        p.insert(300_000);
498        p.insert(1_000_000);
499        p.prune(131_072);
500        let encoded = p.encode();
501        let decoded = Prunable::decode_cfg(encoded, &(..).into()).unwrap();
502        assert_eq!(p, decoded);
503    }
504
505    #[test]
506    fn test_codec_rejects_misaligned_pruned_below() {
507        use bytes::BytesMut;
508        let mut buf = BytesMut::new();
509        // Not a multiple of 65536.
510        100u64.write(&mut buf);
511        // Append an empty bitmap encoding.
512        Bitmap::new().write(&mut buf);
513        let result = Prunable::decode_cfg(buf.freeze(), &(..).into());
514        assert!(matches!(
515            result,
516            Err(CodecError::Invalid("Prunable", msg))
517                if msg.contains("multiple of 65536")
518        ));
519    }
520
521    #[test]
522    fn test_codec_rejects_container_below_pruned() {
523        use bytes::BytesMut;
524        let mut buf = BytesMut::new();
525        // pruned_below = 131_072 (container 2 boundary).
526        131_072u64.write(&mut buf);
527        // Bitmap has a value in container 0, violating the invariant.
528        let mut bitmap = Bitmap::new();
529        bitmap.insert(50);
530        bitmap.write(&mut buf);
531        let result = Prunable::decode_cfg(buf.freeze(), &(..).into());
532        assert!(matches!(
533            result,
534            Err(CodecError::Invalid("Prunable", msg))
535                if msg.contains("container key below pruned_below")
536        ));
537    }
538
539    #[test]
540    fn test_encode_size_empty() {
541        let p = Prunable::new();
542        assert_eq!(
543            p.encode_size(),
544            0u64.encode_size() + Bitmap::new().encode_size()
545        );
546    }
547
548    #[test]
549    fn test_encode_size_grows_with_data() {
550        let s0 = Prunable::new().encode_size();
551        let mut p = Prunable::new();
552        p.insert(100);
553        p.insert(65_537);
554        p.insert(131_073);
555        assert!(p.encode_size() > s0);
556    }
557
558    #[test]
559    fn test_encode_size_unaffected_by_pruning_alone() {
560        // Pruning that removes nothing (threshold below all data) should not change encoding.
561        let mut p = Prunable::new();
562        p.insert(200_000);
563        let before = p.encode_size();
564        p.prune(0);
565        assert_eq!(p.encode_size(), before);
566    }
567
568    #[cfg(feature = "arbitrary")]
569    mod conformance {
570        use commonware_codec::conformance::CodecConformance;
571
572        commonware_conformance::conformance_tests! {
573            CodecConformance<super::Prunable>,
574        }
575    }
576}